Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3679

Allow Kafka consumer to skip corrupted messages

    Details

      Description

      There are a couple of issues with the DeserializationSchema API that I think should be improved. This request has come to me via an existing Flink user.

      The main issue is simply that the API assumes that there is a one-to-one mapping between input and outputs. In reality there are scenarios where one input message (say from Kafka) might actually map to zero or more logical elements in the pipeline.

      Particularly important here is the case where you receive a message from a source (such as Kafka) and say the raw bytes don't deserialize properly. Right now the only recourse is to throw IOException and therefore fail the job.

      This is definitely not good since bad data is a reality and failing the job is not the right option. If the job fails we'll just end up replaying the bad data and the whole thing will start again.

      Instead in this case it would be best if the user could just return the empty set.

      The other case is where one input message should logically be multiple output messages. This case is probably less important since there are other ways to do this but in general it might be good to make the DeserializationSchema.deserialize() method return a collection rather than a single element.

      Maybe we need to support a DeserializationSchema variant that has semantics more like that of FlatMap.

        Issue Links

          Activity

          Hide
          rmetzger Robert Metzger added a comment -

          I had a quick offline chat about this with Stephan Ewen. Changing the semantics of the DeserializationSchema to use an OutputCollector would be possible, but it would break existing code, introduce a new class and make the locking / operator chaining of the Kafka consumer code more complicated.
          I wonder if the problems you've mentioned can't be solved with a flatMap() operator. When the Kafka consumer and the flatMap() are executed with the same parallelism, they'll be chained together and then executed in the same thread with almost no overhead.
          If one Kafka message results in two or more logical messages, that "splitting" can be done in the flatMap() as well. For invalid records, this can also be reflected in the returned record (with a failure flag (some id set to -1 or a bool set to false), or a special field in a JSON record), ...) and then treated accordingly in the flatMap() call.

          If you want, we can keep the JIRA issue open and see if more users run into this. If so, we can reconsider fixing it (I'm not saying I've decided against fixing it)

          Show
          rmetzger Robert Metzger added a comment - I had a quick offline chat about this with Stephan Ewen . Changing the semantics of the DeserializationSchema to use an OutputCollector would be possible, but it would break existing code, introduce a new class and make the locking / operator chaining of the Kafka consumer code more complicated. I wonder if the problems you've mentioned can't be solved with a flatMap() operator. When the Kafka consumer and the flatMap() are executed with the same parallelism, they'll be chained together and then executed in the same thread with almost no overhead. If one Kafka message results in two or more logical messages, that "splitting" can be done in the flatMap() as well. For invalid records, this can also be reflected in the returned record (with a failure flag (some id set to -1 or a bool set to false), or a special field in a JSON record), ...) and then treated accordingly in the flatMap() call. If you want, we can keep the JIRA issue open and see if more users run into this. If so, we can reconsider fixing it (I'm not saying I've decided against fixing it)
          Hide
          jgrier Jamie Grier added a comment - - edited

          I'm not sure about the locking and operator chaining issues so I would say if that's unduly complicated because of this change maybe it's not worth it. However, a DeserializationSchema with more flatMap() like semantics would certainly be the better API given that bad data issues are a reality. It also seems we could provide this without breaking existing code, but certainly it would add a bit more complexity to the API (having multiple variants for this).

          Anyway, I agree you can work around this issue my making a special "sentinel" value and dealing with all of this is in a chained flatMap() operator. I imagine that's exactly the approach that people are already using.

          Show
          jgrier Jamie Grier added a comment - - edited I'm not sure about the locking and operator chaining issues so I would say if that's unduly complicated because of this change maybe it's not worth it. However, a DeserializationSchema with more flatMap() like semantics would certainly be the better API given that bad data issues are a reality. It also seems we could provide this without breaking existing code, but certainly it would add a bit more complexity to the API (having multiple variants for this). Anyway, I agree you can work around this issue my making a special "sentinel" value and dealing with all of this is in a chained flatMap() operator. I imagine that's exactly the approach that people are already using.
          Show
          rmetzger Robert Metzger added a comment - Two users were affected by this recently: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Handle-deserialization-error-td8724.html#a8725 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Handling-Kafka-DeserializationSchema-exceptions-td8700.html I think we need to fix this issue.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          +1 to fix the issue, the proposed changes seem reasonable and heads towards a better API. The Kinesis consumer will need to adapt to this change as well, as it also accepts DeserializationSchema.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - +1 to fix the issue, the proposed changes seem reasonable and heads towards a better API. The Kinesis consumer will need to adapt to this change as well, as it also accepts DeserializationSchema.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          Hi Haohui Mai!
          Thank you for picking this JIRA up. How are you doing with this work?

          Since the previous discussion didn't really come to a conclusion on the API changes for this feature yet, can you briefly describe how you plan to add this? We might need to be extra careful in how the new API works with the state update locking in the consumer.
          (Sorry, I just realized you have some proposals already in FLINK-5583. I'll move the API discussion there.)

          Please also feel free to call out to us if you want to jump around some ideas or bump into any problems for this.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Hi Haohui Mai ! Thank you for picking this JIRA up. How are you doing with this work? Since the previous discussion didn't really come to a conclusion on the API changes for this feature yet, can you briefly describe how you plan to add this? We might need to be extra careful in how the new API works with the state update locking in the consumer. (Sorry, I just realized you have some proposals already in FLINK-5583 . I'll move the API discussion there.) Please also feel free to call out to us if you want to jump around some ideas or bump into any problems for this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user haohui opened a pull request:

          https://github.com/apache/flink/pull/3314

          FLINK-3679 DeserializationSchema should handle zero or more outputs

          This PR adds a new interface, `RichKeyedDeserializationSchema`, to enable the deserializer to produce zero or more outputs. The main use case is that skipping corrupted messages in the Kafka stream.

          Feedbacks (especially on backward compatibility) are highly appreciated.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/haohui/flink FLINK-3679

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3314.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3314


          commit 7728acb3bc00a12a7552706be569710fbfdbd200
          Author: Haohui Mai <wheat9@apache.org>
          Date: 2017-02-14T22:19:29Z

          FLINK-3679 DeserializationSchema should handle zero or more outputs for every input.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user haohui opened a pull request: https://github.com/apache/flink/pull/3314 FLINK-3679 DeserializationSchema should handle zero or more outputs This PR adds a new interface, `RichKeyedDeserializationSchema`, to enable the deserializer to produce zero or more outputs. The main use case is that skipping corrupted messages in the Kafka stream. Feedbacks (especially on backward compatibility) are highly appreciated. You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-3679 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3314.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3314 commit 7728acb3bc00a12a7552706be569710fbfdbd200 Author: Haohui Mai <wheat9@apache.org> Date: 2017-02-14T22:19:29Z FLINK-3679 DeserializationSchema should handle zero or more outputs for every input.
          Hide
          wheat9 Haohui Mai added a comment -

          Tzu-Li (Gordon) Tai – would you mind taking a look?

          Show
          wheat9 Haohui Mai added a comment - Tzu-Li (Gordon) Tai – would you mind taking a look?
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Hi Haohui Mai, sure! I've noticed your PR, and will schedule some time next week to review it
          Thank you for the reminder.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Hi Haohui Mai , sure! I've noticed your PR, and will schedule some time next week to review it Thank you for the reminder.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102048475

          — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java —
          @@ -373,16 +370,28 @@ else if (partitionsRemoved)

          { keyPayload.get(keyBytes); }
          • final T value = deserializer.deserialize(keyBytes, valueBytes,
          • currentPartition.getTopic(), currentPartition.getPartition(), offset);
          • if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - }
          • owner.emitRecord(value, currentPartition, offset);
            + final Collector<T> collector = new Collector<T>() {
              • End diff –

          I'm not sure of the performance implications for this. The JVM will create a Collector instance for each record read from Kafka.
          I wonder if we can re-use one collector instance here.

          Also, I wonder if we need to use this `Collector` implementation, with a `close()` method we are not using and an exception we are turning into a `RuntimeException`. Maybe we should let the collect throw an exception?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102048475 — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java — @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } final T value = deserializer.deserialize(keyBytes, valueBytes, currentPartition.getTopic(), currentPartition.getPartition(), offset); if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } owner.emitRecord(value, currentPartition, offset); + final Collector<T> collector = new Collector<T>() { End diff – I'm not sure of the performance implications for this. The JVM will create a Collector instance for each record read from Kafka. I wonder if we can re-use one collector instance here. Also, I wonder if we need to use this `Collector` implementation, with a `close()` method we are not using and an exception we are turning into a `RuntimeException`. Maybe we should let the collect throw an exception?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102048656

          — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java —
          @@ -142,25 +141,38 @@ public void runFetchLoop() throws Exception {
          final ConsumerRecords<byte[], byte[]> records = handover.pollNext();

          // get the records for each topic partition

          • for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
            + for (final KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {

          List<ConsumerRecord<byte[], byte[]>> partitionRecords =
          records.records(partition.getKafkaPartitionHandle());

          • for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
          • final T value = deserializer.deserialize(
          • record.key(), record.value(),
          • record.topic(), record.partition(), record.offset());
            -
          • if (deserializer.isEndOfStream(value)) { - // end of stream signaled - running = false; - break; - }

            -

          • // emit the actual record. this also updates offset state atomically
          • // and deals with timestamps and watermark generation
          • emitRecord(value, partition, record.offset(), record);
            + for (final ConsumerRecord<byte[], byte[]> record : partitionRecords) {
            + final Collector<T> collector = new Collector<T>() {
              • End diff –

          Same question as in the Kafka 0.8 impl

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102048656 — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java — @@ -142,25 +141,38 @@ public void runFetchLoop() throws Exception { final ConsumerRecords<byte[], byte[]> records = handover.pollNext(); // get the records for each topic partition for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { + for (final KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle()); for (ConsumerRecord<byte[], byte[]> record : partitionRecords) { final T value = deserializer.deserialize( record.key(), record.value(), record.topic(), record.partition(), record.offset()); - if (deserializer.isEndOfStream(value)) { - // end of stream signaled - running = false; - break; - } - // emit the actual record. this also updates offset state atomically // and deals with timestamps and watermark generation emitRecord(value, partition, record.offset(), record); + for (final ConsumerRecord<byte[], byte[]> record : partitionRecords) { + final Collector<T> collector = new Collector<T>() { End diff – Same question as in the Kafka 0.8 impl
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3314

          Thank you for opening a pull request.
          I think the change is missing an update to the documentation. I did a very very superficial review of the change This needs a more thorough check.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3314 Thank you for opening a pull request. I think the change is missing an update to the documentation. I did a very very superficial review of the change This needs a more thorough check.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102299038

          — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java —
          @@ -373,16 +370,28 @@ else if (partitionsRemoved)

          { keyPayload.get(keyBytes); }
          • final T value = deserializer.deserialize(keyBytes, valueBytes,
          • currentPartition.getTopic(), currentPartition.getPartition(), offset);
          • if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - }
          • owner.emitRecord(value, currentPartition, offset);
            + final Collector<T> collector = new Collector<T>() {
              • End diff –

          Totally agree. Playing around a little bit and it might require some trade-offs here.

          The problem is that `emitRecord()` needs the state for each records (e.g., topic partition, offset, etc.). The state can be either passed inside a closure (like the new instance for the `Collector`) or passed through arguments. I see there are three possibilities here:

          1. Create a new instance of `Collector` for every record. The JVM may or may not be able to optimize it. Trace-based JVM should be able to but I'm not sure about classed-based JVM.

          2. Expose the internal state in the `collect()` call. The `collect()` call takes additional parameters such as offset and partition state. It reduces the GC overheads but also hinders changing the implementation.

          3. Create a new interface like `Optional<T> deserialize(byte[] messageKey, ...)` (or
          `void deserialize(byte[] messageKey, ..., AtomicReference<T> result)` to optimize away the cost of the `Optional` class). It results in a slightly more complex APIs but it probably has the best trade-offs between performances and API compatibility.

          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102299038 — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java — @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } final T value = deserializer.deserialize(keyBytes, valueBytes, currentPartition.getTopic(), currentPartition.getPartition(), offset); if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } owner.emitRecord(value, currentPartition, offset); + final Collector<T> collector = new Collector<T>() { End diff – Totally agree. Playing around a little bit and it might require some trade-offs here. The problem is that `emitRecord()` needs the state for each records (e.g., topic partition, offset, etc.). The state can be either passed inside a closure (like the new instance for the `Collector`) or passed through arguments. I see there are three possibilities here: 1. Create a new instance of `Collector` for every record. The JVM may or may not be able to optimize it. Trace-based JVM should be able to but I'm not sure about classed-based JVM. 2. Expose the internal state in the `collect()` call. The `collect()` call takes additional parameters such as offset and partition state. It reduces the GC overheads but also hinders changing the implementation. 3. Create a new interface like `Optional<T> deserialize(byte[] messageKey, ...)` (or `void deserialize(byte[] messageKey, ..., AtomicReference<T> result)` to optimize away the cost of the `Optional` class). It results in a slightly more complex APIs but it probably has the best trade-offs between performances and API compatibility. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102542018

          — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java —
          @@ -373,16 +370,28 @@ else if (partitionsRemoved)

          { keyPayload.get(keyBytes); }
          • final T value = deserializer.deserialize(keyBytes, valueBytes,
          • currentPartition.getTopic(), currentPartition.getPartition(), offset);
          • if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - }
          • owner.emitRecord(value, currentPartition, offset);
            + final Collector<T> collector = new Collector<T>() {
              • End diff –

          @StephanEwen What is your opinion on solving this problem?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102542018 — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java — @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } final T value = deserializer.deserialize(keyBytes, valueBytes, currentPartition.getTopic(), currentPartition.getPartition(), offset); if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } owner.emitRecord(value, currentPartition, offset); + final Collector<T> collector = new Collector<T>() { End diff – @StephanEwen What is your opinion on solving this problem?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102665687

          — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java —
          @@ -373,16 +370,28 @@ else if (partitionsRemoved)

          { keyPayload.get(keyBytes); }
          • final T value = deserializer.deserialize(keyBytes, valueBytes,
          • currentPartition.getTopic(), currentPartition.getPartition(), offset);
          • if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - }
          • owner.emitRecord(value, currentPartition, offset);
            + final Collector<T> collector = new Collector<T>() {
              • End diff –

          Moving the discussion back a bit:

          I don't think this implementation works correctly with exactly-once and how we checkpoint the consumer's partition offset state.

          The problem is that, in `emitRecord`, we will be updating the offset state. In the changes here, what this means is that we will be considering a record to have been fully processed as soon as the collector collects something.

          For example, lets say the serializer will call `collect` 3 times for elements deserialized from record R before `deserialize` returns. R has offset 100L. As soon as the first element is collected, the state will be updated to `finished processing offset 100L`. If now checkpointing is triggered, and we use that checkpoint to restore, we will be skipping the remaining 2 elements that were yet to be collected.
          Once

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102665687 — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java — @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } final T value = deserializer.deserialize(keyBytes, valueBytes, currentPartition.getTopic(), currentPartition.getPartition(), offset); if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } owner.emitRecord(value, currentPartition, offset); + final Collector<T> collector = new Collector<T>() { End diff – Moving the discussion back a bit: I don't think this implementation works correctly with exactly-once and how we checkpoint the consumer's partition offset state. The problem is that, in `emitRecord`, we will be updating the offset state. In the changes here, what this means is that we will be considering a record to have been fully processed as soon as the collector collects something. For example, lets say the serializer will call `collect` 3 times for elements deserialized from record R before `deserialize` returns. R has offset 100L. As soon as the first element is collected, the state will be updated to `finished processing offset 100L`. If now checkpointing is triggered, and we use that checkpoint to restore, we will be skipping the remaining 2 elements that were yet to be collected. Once
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102668004

          — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java —
          @@ -373,16 +370,28 @@ else if (partitionsRemoved)

          { keyPayload.get(keyBytes); }
          • final T value = deserializer.deserialize(keyBytes, valueBytes,
          • currentPartition.getTopic(), currentPartition.getPartition(), offset);
          • if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - }
          • owner.emitRecord(value, currentPartition, offset);
            + final Collector<T> collector = new Collector<T>() {
              • End diff –

          What I think we should do to solve this correctly:

          Buffer the elements collected from the `deserialize` call. The `Collector.collect` implementation should simply add the collected element to the buffer, and not emit it immediately.

          After `deserialize` returns, call `emitRecord` once with all the elements in the buffer and the original record's offset. This, of course, would mean we need to slightly change the `emitRecord` implementation a bit to something like:
          ```
          void emitRecord(List<T> records, KafkaTopicPartitionState<KPH> partitionState, long offset) {
          synchronized (checkpointLock) {
          for (T record : records)

          { sourceContext.collect(record); }

          partitionState.setOffset(offset);
          }
          }
          ```

          After this, we proceed with the next record and repeat. Note that the emitting of all produced elements from record at offset 100L and the update to the offset state to 100L happens atomically synchronized on the checkpoint lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 100, and not in-between.

          I think we should also be able to avoid a per-record `Collector` with this solution. We can reuse a `Collector` and provide it to the `deserializer` for every record, because it's simply only a means to collect elements to the internal buffer and we're not calling `emitRecords` in it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102668004 — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java — @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } final T value = deserializer.deserialize(keyBytes, valueBytes, currentPartition.getTopic(), currentPartition.getPartition(), offset); if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } owner.emitRecord(value, currentPartition, offset); + final Collector<T> collector = new Collector<T>() { End diff – What I think we should do to solve this correctly: Buffer the elements collected from the `deserialize` call. The `Collector.collect` implementation should simply add the collected element to the buffer, and not emit it immediately. After `deserialize` returns, call `emitRecord` once with all the elements in the buffer and the original record's offset. This, of course, would mean we need to slightly change the `emitRecord` implementation a bit to something like: ``` void emitRecord(List<T> records, KafkaTopicPartitionState<KPH> partitionState, long offset) { synchronized (checkpointLock) { for (T record : records) { sourceContext.collect(record); } partitionState.setOffset(offset); } } ``` After this, we proceed with the next record and repeat. Note that the emitting of all produced elements from record at offset 100L and the update to the offset state to 100L happens atomically synchronized on the checkpoint lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 100, and not in-between. I think we should also be able to avoid a per-record `Collector` with this solution. We can reuse a `Collector` and provide it to the `deserializer` for every record, because it's simply only a means to collect elements to the internal buffer and we're not calling `emitRecords` in it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102830609

          — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java —
          @@ -373,16 +370,28 @@ else if (partitionsRemoved)

          { keyPayload.get(keyBytes); }
          • final T value = deserializer.deserialize(keyBytes, valueBytes,
          • currentPartition.getTopic(), currentPartition.getPartition(), offset);
          • if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - }
          • owner.emitRecord(value, currentPartition, offset);
            + final Collector<T> collector = new Collector<T>() {
              • End diff –

          Good catch, @tzulitai !

          I tried the buffer approach and had no luck. The problem is that calling `emitRecord`needs to pass in both the offset and the record itself – The record is used to extract the timestamp in the Kafka 0.10 consumers. The buffer itself needs to buffer the deserialized value and the record itself – it cannot solve the problem of having a collector per record.

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102830609 — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java — @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } final T value = deserializer.deserialize(keyBytes, valueBytes, currentPartition.getTopic(), currentPartition.getPartition(), offset); if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } owner.emitRecord(value, currentPartition, offset); + final Collector<T> collector = new Collector<T>() { End diff – Good catch, @tzulitai ! I tried the buffer approach and had no luck. The problem is that calling `emitRecord`needs to pass in both the offset and the record itself – The record is used to extract the timestamp in the Kafka 0.10 consumers. The buffer itself needs to buffer the deserialized value and the record itself – it cannot solve the problem of having a collector per record.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102881092

          — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java —
          @@ -373,16 +370,28 @@ else if (partitionsRemoved)

          { keyPayload.get(keyBytes); }
          • final T value = deserializer.deserialize(keyBytes, valueBytes,
          • currentPartition.getTopic(), currentPartition.getPartition(), offset);
          • if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - }
          • owner.emitRecord(value, currentPartition, offset);
            + final Collector<T> collector = new Collector<T>() {
              • End diff –

          @haohui hmm this seems a bit odd to me. I think it should be achievable.

          ```
          // the buffer; this can be shared
          final List<T> bufferedElements = new LinkedList<>();
          // BufferCollector is an implementation of Collector that adds collected elements to bufferedElements; this can be shared
          final BufferCollector collector = new BufferCollector<T>(bufferedElements);

          ...

          for (final ConsumerRecord<byte[], byte[]> record : partitionRecords)

          { deserializer.deserialize( record.key(), record.value(), record.topic(), record.partition(), record.offset(), collector); emitRecords(bufferedElements, partitionState, record.offset(), record); bufferedElements.clear(); // after the elements for the record have been emitted, empty out the buffer }

          ```

          Doesn't this work? I haven't really tried this hands-on, so I might be overlooking something. Let me know what you think

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102881092 — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java — @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } final T value = deserializer.deserialize(keyBytes, valueBytes, currentPartition.getTopic(), currentPartition.getPartition(), offset); if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } owner.emitRecord(value, currentPartition, offset); + final Collector<T> collector = new Collector<T>() { End diff – @haohui hmm this seems a bit odd to me. I think it should be achievable. ``` // the buffer; this can be shared final List<T> bufferedElements = new LinkedList<>(); // BufferCollector is an implementation of Collector that adds collected elements to bufferedElements; this can be shared final BufferCollector collector = new BufferCollector<T>(bufferedElements); ... for (final ConsumerRecord<byte[], byte[]> record : partitionRecords) { deserializer.deserialize( record.key(), record.value(), record.topic(), record.partition(), record.offset(), collector); emitRecords(bufferedElements, partitionState, record.offset(), record); bufferedElements.clear(); // after the elements for the record have been emitted, empty out the buffer } ``` Doesn't this work? I haven't really tried this hands-on, so I might be overlooking something. Let me know what you think
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102881264

          — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java —
          @@ -373,16 +370,28 @@ else if (partitionsRemoved)

          { keyPayload.get(keyBytes); }
          • final T value = deserializer.deserialize(keyBytes, valueBytes,
          • currentPartition.getTopic(), currentPartition.getPartition(), offset);
          • if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - }
          • owner.emitRecord(value, currentPartition, offset);
            + final Collector<T> collector = new Collector<T>() {
              • End diff –

          I see what you are saying. The trade off here is handing offs the objects another time, but I think it's okay. I'll update the PR accordingly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102881264 — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java — @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } final T value = deserializer.deserialize(keyBytes, valueBytes, currentPartition.getTopic(), currentPartition.getPartition(), offset); if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } owner.emitRecord(value, currentPartition, offset); + final Collector<T> collector = new Collector<T>() { End diff – I see what you are saying. The trade off here is handing offs the objects another time, but I think it's okay. I'll update the PR accordingly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102881632

          — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java —
          @@ -373,16 +370,28 @@ else if (partitionsRemoved)

          { keyPayload.get(keyBytes); }
          • final T value = deserializer.deserialize(keyBytes, valueBytes,
          • currentPartition.getTopic(), currentPartition.getPartition(), offset);
          • if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - }
          • owner.emitRecord(value, currentPartition, offset);
            + final Collector<T> collector = new Collector<T>() {
              • End diff –

          @haohui, if you don't mind, I would also wait for @rmetzger to take another look at the new proposals here, before you jump back again into the code.
          This part is quite critical for Flink Kafka's exacty-once guarantee, so another pair of eyes on this will be safer.

          I would also like to do a thorough pass on your code and see if there are other problems, so you work on those all-together.

          Is that ok for you? Sorry for some more waiting.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102881632 — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java — @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } final T value = deserializer.deserialize(keyBytes, valueBytes, currentPartition.getTopic(), currentPartition.getPartition(), offset); if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } owner.emitRecord(value, currentPartition, offset); + final Collector<T> collector = new Collector<T>() { End diff – @haohui, if you don't mind, I would also wait for @rmetzger to take another look at the new proposals here, before you jump back again into the code. This part is quite critical for Flink Kafka's exacty-once guarantee, so another pair of eyes on this will be safer. I would also like to do a thorough pass on your code and see if there are other problems, so you work on those all-together. Is that ok for you? Sorry for some more waiting.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102901299

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java —
          @@ -1236,10 +1237,11 @@ public Tuple2WithTopicSchema(ExecutionConfig ec) {
          }

          @Override

          • public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
            + public void deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset,
            + Collector<Tuple3<Integer, Integer, String>> collector) throws IOException {
              • End diff –

          Same here: the indentation formatting seems off.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102901299 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java — @@ -1236,10 +1237,11 @@ public Tuple2WithTopicSchema(ExecutionConfig ec) { } @Override public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + public void deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, + Collector<Tuple3<Integer, Integer, String>> collector) throws IOException { End diff – Same here: the indentation formatting seems off.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102898788

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java —
          @@ -0,0 +1,54 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
          +import org.apache.flink.util.Collector;
          +
          +import java.io.IOException;
          +import java.io.Serializable;
          +
          +/**
          + * RichDeserializationSchema describes how to turn byte key / value messages into zero or more messages into data types.
          — End diff –

          The name of the class is `RichKeyedDeserializationSchema `, but in the Javadocs it mentions `RichDeserializationSchema `.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102898788 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java — @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.io.Serializable; + +/** + * RichDeserializationSchema describes how to turn byte key / value messages into zero or more messages into data types. — End diff – The name of the class is `RichKeyedDeserializationSchema `, but in the Javadocs it mentions `RichDeserializationSchema `.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102900238

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchemaWrapper.java —
          @@ -0,0 +1,50 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.util.Collector;
          +
          +import java.io.IOException;
          +
          +public class RichKeyedDeserializationSchemaWrapper<T> implements RichKeyedDeserializationSchema<T> {
          — End diff –

          Can you also include Javadocs for this class?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102900238 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchemaWrapper.java — @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Collector; + +import java.io.IOException; + +public class RichKeyedDeserializationSchemaWrapper<T> implements RichKeyedDeserializationSchema<T> { — End diff – Can you also include Javadocs for this class?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102898901

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java —
          @@ -0,0 +1,54 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
          +import org.apache.flink.util.Collector;
          +
          +import java.io.IOException;
          +import java.io.Serializable;
          +
          +/**
          + * RichDeserializationSchema describes how to turn byte key / value messages into zero or more messages into data types.
          + *

          {@see KeyedSerializationSchema}

          — End diff –

          I'm not sure why we need to link to `KeyedSerializationSchema` in the Javadocs for the new serialization schema.
          From what I know, we're going to completely replace it, correct?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102898901 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java — @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.io.Serializable; + +/** + * RichDeserializationSchema describes how to turn byte key / value messages into zero or more messages into data types. + * {@see KeyedSerializationSchema} — End diff – I'm not sure why we need to link to `KeyedSerializationSchema` in the Javadocs for the new serialization schema. From what I know, we're going to completely replace it, correct?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102900986

          — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java —
          @@ -142,25 +141,38 @@ public void runFetchLoop() throws Exception {
          final ConsumerRecords<byte[], byte[]> records = handover.pollNext();

          // get the records for each topic partition

          • for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
            + for (final KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {

          List<ConsumerRecord<byte[], byte[]>> partitionRecords =
          records.records(partition.getKafkaPartitionHandle());

          • for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
          • final T value = deserializer.deserialize(
          • record.key(), record.value(),
          • record.topic(), record.partition(), record.offset());
            -
          • if (deserializer.isEndOfStream(value)) { - // end of stream signaled - running = false; - break; - }

            -

          • // emit the actual record. this also updates offset state atomically
          • // and deals with timestamps and watermark generation
          • emitRecord(value, partition, record.offset(), record);
            + for (final ConsumerRecord<byte[], byte[]> record : partitionRecords) {
            + final Collector<T> collector = new Collector<T>() {
            + @Override
            + public void collect(T value) {
            + if (deserializer.isEndOfStream(value)) { + // end of stream signaled + running = false; + }

            else

            Unknown macro: { + // emit the actual record. this also updates offset state atomically + // and deals with timestamps and watermark generation + try { + emitRecord(value, partition, record.offset(), record); + } catch (Exception e) { + throw new RuntimeException(e); + } + }

            + }
            +
            + @Override
            + public void close()

            { + + }

            + };
            +
            + deserializer.deserialize(
            + record.key(), record.value(),
            + record.topic(), record.partition(), record.offset(), collector);

              • End diff –

          The formatting for the list of arguments here could be nicer. Perhaps one argument per line?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102900986 — Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java — @@ -142,25 +141,38 @@ public void runFetchLoop() throws Exception { final ConsumerRecords<byte[], byte[]> records = handover.pollNext(); // get the records for each topic partition for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { + for (final KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle()); for (ConsumerRecord<byte[], byte[]> record : partitionRecords) { final T value = deserializer.deserialize( record.key(), record.value(), record.topic(), record.partition(), record.offset()); - if (deserializer.isEndOfStream(value)) { - // end of stream signaled - running = false; - break; - } - // emit the actual record. this also updates offset state atomically // and deals with timestamps and watermark generation emitRecord(value, partition, record.offset(), record); + for (final ConsumerRecord<byte[], byte[]> record : partitionRecords) { + final Collector<T> collector = new Collector<T>() { + @Override + public void collect(T value) { + if (deserializer.isEndOfStream(value)) { + // end of stream signaled + running = false; + } else Unknown macro: { + // emit the actual record. this also updates offset state atomically + // and deals with timestamps and watermark generation + try { + emitRecord(value, partition, record.offset(), record); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void close() { + + } + }; + + deserializer.deserialize( + record.key(), record.value(), + record.topic(), record.partition(), record.offset(), collector); End diff – The formatting for the list of arguments here could be nicer. Perhaps one argument per line?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102899179

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java —
          @@ -0,0 +1,54 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
          +import org.apache.flink.util.Collector;
          +
          +import java.io.IOException;
          +import java.io.Serializable;
          +
          +/**
          + * RichDeserializationSchema describes how to turn byte key / value messages into zero or more messages into data types.
          + *

          {@see KeyedSerializationSchema}

          + *
          + * @param <T> The type created by the keyed deserialization schema.
          + */
          +public interface RichKeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
          + /**
          + * Deserializes the byte message.
          + *
          + * @param messageKey the key as a byte array (null if no key has been set)
          + * @param message The message, as a byte array. (null if the message was empty or deleted)
          + * @param partition The partition the message has originated from
          + * @param offset the offset of the message in the original source (for example the Kafka offset)
          + *
          + * @return The deserialized message as an object.
          + */
          + void deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset,
          + Collector<T> collector) throws IOException;
          — End diff –

          The indentation of the parameters here seems a bit off.
          Now with the number of parameters to be quite lengthy, it might be a good style to have one parameter per line.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102899179 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java — @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.io.Serializable; + +/** + * RichDeserializationSchema describes how to turn byte key / value messages into zero or more messages into data types. + * {@see KeyedSerializationSchema} + * + * @param <T> The type created by the keyed deserialization schema. + */ +public interface RichKeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { + /** + * Deserializes the byte message. + * + * @param messageKey the key as a byte array (null if no key has been set) + * @param message The message, as a byte array. (null if the message was empty or deleted) + * @param partition The partition the message has originated from + * @param offset the offset of the message in the original source (for example the Kafka offset) + * + * @return The deserialized message as an object. + */ + void deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, + Collector<T> collector) throws IOException; — End diff – The indentation of the parameters here seems a bit off. Now with the number of parameters to be quite lengthy, it might be a good style to have one parameter per line.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102896783

          — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java —
          @@ -176,7 +177,7 @@ public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deseri

          • @param props
          • The properties that are used to configure both the fetcher and the offset handler.
            */
          • public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
            + public FlinkKafkaConsumer08(List<String> topics, RichKeyedDeserializationSchema<T> deserializer, Properties props) {
              • End diff –

          This will break user-code. We'll need proper usage migration here.

          We have a separate JIRA that aims at deprecating the current Kafka Consumer constructors: https://issues.apache.org/jira/browse/FLINK-5704. The migration to use the new flat-map deserialzer can be included there.

          Perhaps for this PR, we should just use your `RichKeyedDeserializationSchemaWrapper` as "behaviour bridges" for the original deserialization schema to the new one, and don't change the original constructor / include new constructors yet, so that we don't overlap and complicate things for FLINK-5704.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102896783 — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java — @@ -176,7 +177,7 @@ public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deseri @param props The properties that are used to configure both the fetcher and the offset handler. */ public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { + public FlinkKafkaConsumer08(List<String> topics, RichKeyedDeserializationSchema<T> deserializer, Properties props) { End diff – This will break user-code. We'll need proper usage migration here. We have a separate JIRA that aims at deprecating the current Kafka Consumer constructors: https://issues.apache.org/jira/browse/FLINK-5704 . The migration to use the new flat-map deserialzer can be included there. Perhaps for this PR, we should just use your `RichKeyedDeserializationSchemaWrapper` as "behaviour bridges" for the original deserialization schema to the new one, and don't change the original constructor / include new constructors yet, so that we don't overlap and complicate things for FLINK-5704 .
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102898307

          — Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java —
          @@ -422,6 +429,99 @@ public void run()

          { assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking()); }

          + @Test
          + public void testRichDeserializationSchema() throws Exception {
          + final String topic = "test-topic";
          + final int partition = 3;
          + final byte[] payload = new byte[]

          {1, 2, 3, 4}

          ;
          + final byte[] endPayload = "end".getBytes(StandardCharsets.UTF_8);
          +
          + final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
          + new ConsumerRecord<>(topic, partition, 15, payload, payload),
          + new ConsumerRecord<>(topic, partition, 16, payload, payload),
          + new ConsumerRecord<>(topic, partition, 17, payload, endPayload));
          +
          + final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
          + data.put(new TopicPartition(topic, partition), records);
          +
          + final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
          +
          + // ----- the test consumer -----
          +
          + final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
          + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
          + @Override
          + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation)

          { + return consumerRecords; + }

          + });
          +
          + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
          +
          + // ----- build a fetcher -----
          +
          + ArrayList<String> results = new ArrayList<>();
          + SourceContext<String> sourceContext = new CollectingSourceContext<>(results, results);
          + List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition));
          + RichKeyedDeserializationSchema<String> schema = new RichKeyedDeserializationSchema<String>() {
          + @Override
          + public void deserialize(
          + byte[] messageKey, byte[] message, String topic, int partition,
          + long offset, Collector<String> collector) throws IOException {
          + if (offset != 16)

          { + collector.collect(new String(message)); + }

          + }
          +
          + @Override
          + public boolean isEndOfStream(String nextElement)

          { + return nextElement.equals("end"); + }

          +
          + @Override
          + public TypeInformation<String> getProducedType()

          { + return BasicTypeInfo.STRING_TYPE_INFO; + }

          + };
          +
          + final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
          + sourceContext,
          + topics,
          + null, /* no restored state */
          + null, /* periodic watermark extractor */
          + null, /* punctuated watermark extractor */
          + new TestProcessingTimeService(),
          + 10, /* watermark interval */
          + this.getClass().getClassLoader(),
          + false, /* checkpointing */
          + "task_name",
          + new UnregisteredMetricsGroup(),
          + schema,
          + new Properties(),
          + 0L,
          + StartupMode.GROUP_OFFSETS,
          + false);
          +
          +
          + // ----- run the fetcher -----
          +
          + final AtomicReference<Throwable> error = new AtomicReference<>();
          + final Thread fetcherRunner = new Thread("fetcher runner") {
          — End diff –

          We have a nice utility `CheckedThread` that serves for the tested purpose here (catching errors and storing its reference).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102898307 — Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java — @@ -422,6 +429,99 @@ public void run() { assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking()); } + @Test + public void testRichDeserializationSchema() throws Exception { + final String topic = "test-topic"; + final int partition = 3; + final byte[] payload = new byte[] {1, 2, 3, 4} ; + final byte[] endPayload = "end".getBytes(StandardCharsets.UTF_8); + + final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList( + new ConsumerRecord<>(topic, partition, 15, payload, payload), + new ConsumerRecord<>(topic, partition, 16, payload, payload), + new ConsumerRecord<>(topic, partition, 17, payload, endPayload)); + + final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>(); + data.put(new TopicPartition(topic, partition), records); + + final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data); + + // ----- the test consumer ----- + + final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) { + return consumerRecords; + } + }); + + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- build a fetcher ----- + + ArrayList<String> results = new ArrayList<>(); + SourceContext<String> sourceContext = new CollectingSourceContext<>(results, results); + List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition(topic, partition)); + RichKeyedDeserializationSchema<String> schema = new RichKeyedDeserializationSchema<String>() { + @Override + public void deserialize( + byte[] messageKey, byte[] message, String topic, int partition, + long offset, Collector<String> collector) throws IOException { + if (offset != 16) { + collector.collect(new String(message)); + } + } + + @Override + public boolean isEndOfStream(String nextElement) { + return nextElement.equals("end"); + } + + @Override + public TypeInformation<String> getProducedType() { + return BasicTypeInfo.STRING_TYPE_INFO; + } + }; + + final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( + sourceContext, + topics, + null, /* no restored state */ + null, /* periodic watermark extractor */ + null, /* punctuated watermark extractor */ + new TestProcessingTimeService(), + 10, /* watermark interval */ + this.getClass().getClassLoader(), + false, /* checkpointing */ + "task_name", + new UnregisteredMetricsGroup(), + schema, + new Properties(), + 0L, + StartupMode.GROUP_OFFSETS, + false); + + + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); + final Thread fetcherRunner = new Thread("fetcher runner") { — End diff – We have a nice utility `CheckedThread` that serves for the tested purpose here (catching errors and storing its reference).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102896891

          — Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java —
          @@ -121,7 +122,7 @@ public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deser

          • @param props
          • The properties that are used to configure both the fetcher and the offset handler.
            */
          • public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
            + public FlinkKafkaConsumer010(List<String> topics, RichKeyedDeserializationSchema<T> deserializer, Properties props) {
              • End diff –

          Same as in the comment in `FlinkKafkaConsumer08`: this breaks user code.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102896891 — Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java — @@ -121,7 +122,7 @@ public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deser @param props The properties that are used to configure both the fetcher and the offset handler. */ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { + public FlinkKafkaConsumer010(List<String> topics, RichKeyedDeserializationSchema<T> deserializer, Properties props) { End diff – Same as in the comment in `FlinkKafkaConsumer08`: this breaks user code.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102900064

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java —
          @@ -0,0 +1,54 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          — End diff –

          Since this is now a very Kafka-specific class, I think this is good timing to change to the package path `org.apache.flink.streaming.kafka.serialization` now.

          The original `KeyedDeserializationSchema` was placed under `o.a.f.s.util.serialization` because it was wrongly packaged in another module before, and moved to `flink-connector-kafka-base` under the same package path to avoid breaking user code.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102900064 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java — @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; — End diff – Since this is now a very Kafka-specific class, I think this is good timing to change to the package path `org.apache.flink.streaming.kafka.serialization` now. The original `KeyedDeserializationSchema` was placed under `o.a.f.s.util.serialization` because it was wrongly packaged in another module before, and moved to `flink-connector-kafka-base` under the same package path to avoid breaking user code.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102897911

          — Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java —
          @@ -422,6 +429,99 @@ public void run()

          { assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking()); }

          + @Test
          + public void testRichDeserializationSchema() throws Exception {
          — End diff –

          I think we should enhance this test to test the behaviour with multiple `collect`s per record also.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102897911 — Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java — @@ -422,6 +429,99 @@ public void run() { assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking()); } + @Test + public void testRichDeserializationSchema() throws Exception { — End diff – I think we should enhance this test to test the behaviour with multiple `collect`s per record also.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102901123

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java —
          @@ -171,8 +171,9 @@ private static String getResourceFilename(String filename) {
          private final List<KafkaTopicPartition> partitions;

          @SuppressWarnings("unchecked")

          • DummyFlinkKafkaConsumer(List<KafkaTopicPartition> partitions) {
          • super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class));
            + DummyFlinkKafkaConsumer(
            + List<KafkaTopicPartition> partitions) {
              • End diff –

          If its just one parameter, I don't think we need a new line.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102901123 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java — @@ -171,8 +171,9 @@ private static String getResourceFilename(String filename) { private final List<KafkaTopicPartition> partitions; @SuppressWarnings("unchecked") DummyFlinkKafkaConsumer(List<KafkaTopicPartition> partitions) { super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class)); + DummyFlinkKafkaConsumer( + List<KafkaTopicPartition> partitions) { End diff – If its just one parameter, I don't think we need a new line.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102900820

          — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java —
          @@ -176,7 +177,7 @@ public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deseri

          • @param props
          • The properties that are used to configure both the fetcher and the offset handler.
            */
          • public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
            + public FlinkKafkaConsumer08(List<String> topics, RichKeyedDeserializationSchema<T> deserializer, Properties props) {
            super(topics, deserializer);
              • End diff –

          So, instead of changing the constructor, we should still do
          `super(topics, new RickKeyedDeserializationSchemaWrapper(deserializer))`
          here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102900820 — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java — @@ -176,7 +177,7 @@ public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deseri @param props The properties that are used to configure both the fetcher and the offset handler. */ public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { + public FlinkKafkaConsumer08(List<String> topics, RichKeyedDeserializationSchema<T> deserializer, Properties props) { super(topics, deserializer); End diff – So, instead of changing the constructor, we should still do `super(topics, new RickKeyedDeserializationSchemaWrapper(deserializer))` here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r102902685

          — Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java —
          @@ -119,7 +119,7 @@ public Void answer(InvocationOnMock invocation) {
          @SuppressWarnings("unchecked")
          SourceContext<String> sourceContext = mock(SourceContext.class);
          List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));

          • KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
            + RichKeyedDeserializationSchemaWrapper<String> schema = new RichKeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
              • End diff –

          This file will have conflict with the current `master`, because I recently pushed a hotfix to `master` to fix the indentation of this file (previously, it's incorrectly using spaces to indent instead of tabs). Sorry about this!

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102902685 — Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java — @@ -119,7 +119,7 @@ public Void answer(InvocationOnMock invocation) { @SuppressWarnings("unchecked") SourceContext<String> sourceContext = mock(SourceContext.class); List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + RichKeyedDeserializationSchemaWrapper<String> schema = new RichKeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); End diff – This file will have conflict with the current `master`, because I recently pushed a hotfix to `master` to fix the indentation of this file (previously, it's incorrectly using spaces to indent instead of tabs). Sorry about this!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on the issue:

          https://github.com/apache/flink/pull/3314

          @rmetzger ping...
          just wondering what do you think about all the approaches we have discussed here? Your comments are appreciated.

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on the issue: https://github.com/apache/flink/pull/3314 @rmetzger ping... just wondering what do you think about all the approaches we have discussed here? Your comments are appreciated.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r103339489

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java —
          @@ -0,0 +1,54 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          — End diff –

          Can you please suggest where it should be put?

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r103339489 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java — @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; — End diff – Can you please suggest where it should be put?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r103399810

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java —
          @@ -0,0 +1,54 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          — End diff –

          I would put it perhaps in `org.apache.flink.streaming.kafka.serialization` under `flink-connector-kafka-base`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r103399810 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java — @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; — End diff – I would put it perhaps in `org.apache.flink.streaming.kafka.serialization` under `flink-connector-kafka-base`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r103405663

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java —
          @@ -0,0 +1,61 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.streaming.util.serialization;
          +
          +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
          +import org.apache.flink.util.Collector;
          +
          +import java.io.IOException;
          +import java.io.Serializable;
          +
          +/**
          + * RichKeyedDeserializationSchema describes how to turn byte key / value messages into zero or more messages into data types.
          + *

          {@see KeyedSerializationSchema}

          + *
          + * @param <T> The type created by the keyed deserialization schema.
          + */
          +public interface RichKeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
          + /**
          + * Deserializes the byte message.
          + *
          + * @param messageKey the key as a byte array (null if no key has been set)
          + * @param message The message, as a byte array. (null if the message was empty or deleted)
          + * @param partition The partition the message has originated from
          + * @param offset the offset of the message in the original source (for example the Kafka offset)
          + * @param collector the user-provided collector that deserializes the bytes into zero or more
          + * records.
          + *
          + * @return The deserialized message as an object.
          — End diff –

          The method doesn't return anything.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r103405663 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java — @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util.serialization; + +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.io.Serializable; + +/** + * RichKeyedDeserializationSchema describes how to turn byte key / value messages into zero or more messages into data types. + * {@see KeyedSerializationSchema} + * + * @param <T> The type created by the keyed deserialization schema. + */ +public interface RichKeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { + /** + * Deserializes the byte message. + * + * @param messageKey the key as a byte array (null if no key has been set) + * @param message The message, as a byte array. (null if the message was empty or deleted) + * @param partition The partition the message has originated from + * @param offset the offset of the message in the original source (for example the Kafka offset) + * @param collector the user-provided collector that deserializes the bytes into zero or more + * records. + * + * @return The deserialized message as an object. — End diff – The method doesn't return anything.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3314

          @StephanEwen and I just had an offline discussion about the change, and we came up with the following thoughts:

          Using an `ArrayList` for buffering elements is an "anti-pattern" in Flink, because it is not a robust solution. Users could theoretically run into the size limit of an array list, and unnesting large messages (in multiple threads in the Kafka 0.8 case) can put pressure on the GC. We think that we should try to avoid that approach if possible.

          Alternative approaches we considered (ordered by preference):

          • Define the DeserializationSchema so that users can return `null` if the user doesn't want to emit a record.
            This approach would not change the current approach, and is pretty minimal. Of course, it would not allow for the "unnesting" use case, where you want to emit multiple records from one Kafka message. Users would need to deserialize into a nested structure and use a flatMap afterwards to do the un-nesting.
          • Move the deserialization into the checkpoint lock. This would allow us to collect elements into our internal collector from the user collector while still preserving exactly once semantics.
            This change would probably be a bit more involved code-wise, as we need to rearrange some parts (maybe moving the deserialization schema instance into the emitRecord() method, change of some method signatures).
            A downside of this approach would be that the Kafka 0.8 consumer threads would deserialize records in a sequential order (since only one consumer thread can hold the lock at a time). For Kafka 0.9 this is already the case. I think we can live with that, because the majority of users moved away from kafka 0.8 by now.
          • Use the `ArrayList` approach. Users would potentially run into issues and we would loose some of Flink's robustness.

          @jgrier since you've opened the original JIRA back then, what's your take on the discussion? How bad would it be for users to just allow the `null` or record approach? (Other opinions are of course also appreciated)

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3314 @StephanEwen and I just had an offline discussion about the change, and we came up with the following thoughts: Using an `ArrayList` for buffering elements is an "anti-pattern" in Flink, because it is not a robust solution. Users could theoretically run into the size limit of an array list, and unnesting large messages (in multiple threads in the Kafka 0.8 case) can put pressure on the GC. We think that we should try to avoid that approach if possible. Alternative approaches we considered (ordered by preference): Define the DeserializationSchema so that users can return `null` if the user doesn't want to emit a record. This approach would not change the current approach, and is pretty minimal. Of course, it would not allow for the "unnesting" use case, where you want to emit multiple records from one Kafka message. Users would need to deserialize into a nested structure and use a flatMap afterwards to do the un-nesting. Move the deserialization into the checkpoint lock. This would allow us to collect elements into our internal collector from the user collector while still preserving exactly once semantics. This change would probably be a bit more involved code-wise, as we need to rearrange some parts (maybe moving the deserialization schema instance into the emitRecord() method, change of some method signatures). A downside of this approach would be that the Kafka 0.8 consumer threads would deserialize records in a sequential order (since only one consumer thread can hold the lock at a time). For Kafka 0.9 this is already the case. I think we can live with that, because the majority of users moved away from kafka 0.8 by now. Use the `ArrayList` approach. Users would potentially run into issues and we would loose some of Flink's robustness. @jgrier since you've opened the original JIRA back then, what's your take on the discussion? How bad would it be for users to just allow the `null` or record approach? (Other opinions are of course also appreciated)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jgrier commented on the issue:

          https://github.com/apache/flink/pull/3314

          I think it would be just fine if we allowed a null return given the tradeoffs discussed here. The main thing was to allow users a way to deal with bad data with minimal effort and without throwing an exception and causing their job to restart.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jgrier commented on the issue: https://github.com/apache/flink/pull/3314 I think it would be just fine if we allowed a null return given the tradeoffs discussed here. The main thing was to allow users a way to deal with bad data with minimal effort and without throwing an exception and causing their job to restart.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on the issue:

          https://github.com/apache/flink/pull/3314

          Thanks for the comments. Allowing `DeserializationSchema` to return `null` sounds good to me. I'll update the PR accordingly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on the issue: https://github.com/apache/flink/pull/3314 Thanks for the comments. Allowing `DeserializationSchema` to return `null` sounds good to me. I'll update the PR accordingly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3314

          Thanks a lot for your understanding @haohui.
          Let us know once you've updated the PR so that we can review and merge it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3314 Thanks a lot for your understanding @haohui. Let us know once you've updated the PR so that we can review and merge it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3314

          The change looks good to merge in my opinion.
          @tzulitai can you also have a quick look?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3314 The change looks good to merge in my opinion. @tzulitai can you also have a quick look?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r104227652

          — Diff: docs/dev/connectors/kafka.md —
          @@ -146,6 +146,10 @@ The Flink Kafka Consumer needs to know how to turn the binary data in Kafka into
          `DeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[] message)`
          method gets called for each Kafka message, passing the value from Kafka.

          +There are two possible design choice when the `DeserializationSchema` encounters a corrupted message. It can
          — End diff –

          choices

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r104227652 — Diff: docs/dev/connectors/kafka.md — @@ -146,6 +146,10 @@ The Flink Kafka Consumer needs to know how to turn the binary data in Kafka into `DeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[] message)` method gets called for each Kafka message, passing the value from Kafka. +There are two possible design choice when the `DeserializationSchema` encounters a corrupted message. It can — End diff – choices
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r104312079

          — Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java —
          @@ -419,6 +424,164 @@ public void run()

          { assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking()); }

          + @Test
          + public void testSkipCorruptedMessage() throws Exception {
          +
          + // ----- some test data -----
          +
          + final String topic = "test-topic";
          + final int partition = 3;
          + final byte[] payload = new byte[]

          {1, 2, 3, 4}

          ;
          +
          + final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
          + new ConsumerRecord<>(topic, partition, 15, payload, payload),
          + new ConsumerRecord<>(topic, partition, 16, payload, payload),
          + new ConsumerRecord<>(topic, partition, 17, payload, "end".getBytes()));
          +
          + final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
          + data.put(new TopicPartition(topic, partition), records);
          +
          + final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data);
          +
          + // ----- the test consumer -----
          +
          + final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
          + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
          + @Override
          + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation)

          { + return consumerRecords; + }

          + });
          +
          + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
          +
          + // ----- build a fetcher -----
          +
          + ArrayList<String> results = new ArrayList<>();
          + SourceContext<String> sourceContext = new CollectingSourceContext<>(results, results);
          + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
          + Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
          + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchema<String>() {
          +
          + @Override
          + public String deserialize(byte[] messageKey, byte[] message,
          + String topic, int partition, long offset) throws IOException

          { + return offset == 15 ? null : new String(message); + }

          +
          + @Override
          + public boolean isEndOfStream(String nextElement)

          { + return "end".equals(nextElement); + }

          +
          + @Override
          + public TypeInformation<String> getProducedType()

          { + return BasicTypeInfo.STRING_TYPE_INFO; + }

          + };
          +
          + final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
          + sourceContext,
          + partitionsWithInitialOffsets,
          + null, /* periodic watermark extractor */
          + null, /* punctuated watermark extractor */
          + new TestProcessingTimeService(),
          + 10, /* watermark interval */
          + this.getClass().getClassLoader(),
          + true, /* checkpointing */
          + "task_name",
          + new UnregisteredMetricsGroup(),
          + schema,
          + new Properties(),
          + 0L,
          + false);
          +
          +
          + // ----- run the fetcher -----
          +
          + fetcher.runFetchLoop();
          + assertEquals(1, results.size());
          + }
          +
          + @Test
          + public void testNullAsEOF() throws Exception {
          — End diff –

          I'm not sure if this test is necessary. It's essentially just testing that `isEndOfStream` works when `isEndOfStream` is `true`. Whether or not the condition is `element == null` seems irrelevant to what's been tested.

          We also already have a `runEndOfStreamTest` in `KafkaConsumerTestBase`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r104312079 — Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java — @@ -419,6 +424,164 @@ public void run() { assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking()); } + @Test + public void testSkipCorruptedMessage() throws Exception { + + // ----- some test data ----- + + final String topic = "test-topic"; + final int partition = 3; + final byte[] payload = new byte[] {1, 2, 3, 4} ; + + final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList( + new ConsumerRecord<>(topic, partition, 15, payload, payload), + new ConsumerRecord<>(topic, partition, 16, payload, payload), + new ConsumerRecord<>(topic, partition, 17, payload, "end".getBytes())); + + final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>(); + data.put(new TopicPartition(topic, partition), records); + + final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data); + + // ----- the test consumer ----- + + final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) { + return consumerRecords; + } + }); + + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- build a fetcher ----- + + ArrayList<String> results = new ArrayList<>(); + SourceContext<String> sourceContext = new CollectingSourceContext<>(results, results); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchema<String>() { + + @Override + public String deserialize(byte[] messageKey, byte[] message, + String topic, int partition, long offset) throws IOException { + return offset == 15 ? null : new String(message); + } + + @Override + public boolean isEndOfStream(String nextElement) { + return "end".equals(nextElement); + } + + @Override + public TypeInformation<String> getProducedType() { + return BasicTypeInfo.STRING_TYPE_INFO; + } + }; + + final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( + sourceContext, + partitionsWithInitialOffsets, + null, /* periodic watermark extractor */ + null, /* punctuated watermark extractor */ + new TestProcessingTimeService(), + 10, /* watermark interval */ + this.getClass().getClassLoader(), + true, /* checkpointing */ + "task_name", + new UnregisteredMetricsGroup(), + schema, + new Properties(), + 0L, + false); + + + // ----- run the fetcher ----- + + fetcher.runFetchLoop(); + assertEquals(1, results.size()); + } + + @Test + public void testNullAsEOF() throws Exception { — End diff – I'm not sure if this test is necessary. It's essentially just testing that `isEndOfStream` works when `isEndOfStream` is `true`. Whether or not the condition is `element == null` seems irrelevant to what's been tested. We also already have a `runEndOfStreamTest` in `KafkaConsumerTestBase`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3314#discussion_r104311996

          — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java —
          @@ -381,6 +381,10 @@ else if (partitionsRemoved)

          { partitionsIterator.remove(); continue partitionsLoop; }

          +
          + if (value == null)

          { + continue; + }

          — End diff –

          Would it make sense to do the `null` checking inside `emitRecord(...)`?
          Otherwise, we wouldn't be updating the state for skipped records, and therefore not accounting it as "already processed".

          I don't think it really matters, since we aren't outputting anything anyway, but I see at least one minor advantage that might deserve changing it: If we fail during a series of continuous skipped records, we won't be wasting any overhead re-processing them on restore.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r104311996 — Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java — @@ -381,6 +381,10 @@ else if (partitionsRemoved) { partitionsIterator.remove(); continue partitionsLoop; } + + if (value == null) { + continue; + } — End diff – Would it make sense to do the `null` checking inside `emitRecord(...)`? Otherwise, we wouldn't be updating the state for skipped records, and therefore not accounting it as "already processed". I don't think it really matters, since we aren't outputting anything anyway, but I see at least one minor advantage that might deserve changing it: If we fail during a series of continuous skipped records, we won't be wasting any overhead re-processing them on restore.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on the issue:

          https://github.com/apache/flink/pull/3314

          CI failed in one of the group as the group was timed out. The specific group was not timed out in the last run.

          @tzulitai can you please take another look? Thanks

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on the issue: https://github.com/apache/flink/pull/3314 CI failed in one of the group as the group was timed out. The specific group was not timed out in the last run. @tzulitai can you please take another look? Thanks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3314

          LGTM, I'll proceed to merge this later today.
          One minor problem: the offset state still isn't updated if `record == null`. We need to do the checking in the synchronize block in the `emitRecord*` methods.

          It's a simple fix, so I can do it while merging

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3314 LGTM, I'll proceed to merge this later today. One minor problem: the offset state still isn't updated if `record == null`. We need to do the checking in the synchronize block in the `emitRecord*` methods. It's a simple fix, so I can do it while merging
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3314

          I've made some final general improvements in https://github.com/tzulitai/flink/tree/PR-FLINK-3679.

          Doing a Travis run before merging:
          https://travis-ci.org/tzulitai/flink/builds/209054624

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3314 I've made some final general improvements in https://github.com/tzulitai/flink/tree/PR-FLINK-3679 . Doing a Travis run before merging: https://travis-ci.org/tzulitai/flink/builds/209054624
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3314

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3314
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Resolved for master with http://git-wip-us.apache.org/repos/asf/flink/commit/c39ad31.

          Thanks a lot for your contribution Haohui Mai!

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved for master with http://git-wip-us.apache.org/repos/asf/flink/commit/c39ad31 . Thanks a lot for your contribution Haohui Mai !
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          I'm a bit hesitant whether or not we want to backport this fix to release-1.1 and release-1.2.

          I think it's ok, since it doesn't break any user code.

          Robert Metzger Jamie Grier what do you think?

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - I'm a bit hesitant whether or not we want to backport this fix to release-1.1 and release-1.2 . I think it's ok, since it doesn't break any user code. Robert Metzger Jamie Grier what do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3314

          @haohui - one suggestion for future contributions for easier reviews:
          We usually use follow-up commits that addresses review comments, instead of force pushing the whole branch. For reviewers, this allows easier tracking of history of what has been addressed and fixed from the original PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3314 @haohui - one suggestion for future contributions for easier reviews: We usually use follow-up commits that addresses review comments, instead of force pushing the whole branch. For reviewers, this allows easier tracking of history of what has been addressed and fixed from the original PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on the issue:

          https://github.com/apache/flink/pull/3314

          Yes totally agree. Thanks very much for taking the time to review the PRs. Will do it next time.

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on the issue: https://github.com/apache/flink/pull/3314 Yes totally agree. Thanks very much for taking the time to review the PRs. Will do it next time.

            People

            • Assignee:
              wheat9 Haohui Mai
              Reporter:
              jgrier Jamie Grier
            • Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development