Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5583

Support flexible error handling in the Kafka consumer

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Kafka Connector
    • Labels:
      None

      Description

      We found that it is valuable to allow the applications to handle errors and exceptions in the Kafka consumer in order to build a robust application in production.

      The context is the following:

      (1) We have schematized, Avro records flowing through Kafka.
      (2) The decoder implements the DeserializationSchema to decode the records.
      (3) Occasionally there are corrupted records (e.g., schema issues). The streaming pipeline might want to bail out (which is the current behavior) or to skip the corrupted records depending on the applications.

      Two options are available:

      (1) Have a variant of DeserializationSchema to return a FlatMap like structure as suggested in FLINK-3679.
      (2) Allow the applications to catch and handle the exception by exposing some APIs that are similar to the ExceptionProxy.

      Thoughts?

        Issue Links

          Activity

          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Resolved for master with http://git-wip-us.apache.org/repos/asf/flink/commit/c39ad31.

          Thanks a lot for your contribution Haohui Mai!

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved for master with http://git-wip-us.apache.org/repos/asf/flink/commit/c39ad31 . Thanks a lot for your contribution Haohui Mai !
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          The collector can be made thread-safe yes, depending on the implementation. But for the synchronization to work properly, user implementations of deserialize(bytes, collector) will need to make sure that all outputs are added to the collector before returning from the method.

          Also note that for the underlying implementation, it might differ between 0.8 and 0.9+, mainly due to the different threading models for how partitions are consumed.
          For 0.8 I think we need to have a separate collector for each subscribed Kafka partition, while in 0.9, we can have a single collector for the whole subtask.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited The collector can be made thread-safe yes, depending on the implementation. But for the synchronization to work properly, user implementations of deserialize(bytes, collector) will need to make sure that all outputs are added to the collector before returning from the method. Also note that for the underlying implementation, it might differ between 0.8 and 0.9+, mainly due to the different threading models for how partitions are consumed. For 0.8 I think we need to have a separate collector for each subscribed Kafka partition, while in 0.9, we can have a single collector for the whole subtask.
          Hide
          wheat9 Haohui Mai added a comment -

          The interface looks good to me. It looks like that you want to have the deserialization out of the lock. Is it okay to assume that collector is thread-safe?

          Show
          wheat9 Haohui Mai added a comment - The interface looks good to me. It looks like that you want to have the deserialization out of the lock. Is it okay to assume that collector is thread-safe?
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary for general use cases (especially the writeToExternalSources method; some users might just want to drop the corrupt record).

          First of all, I would still like to keep the interface to the minimal flatMap-like version proposed in FLINK-3679:

          public interface NewDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
              // user uses collector to buffer outputs
              void deserialize(byte[] message, OutputCollector<T> collector);
          }
          

          Something like the above (ignore the dummy name, we can think of a better one :-D).

          The way it would work is:
          1. Consumer starts processing record with offset 32 (example).
          2. Have an internal buffer in the consumer to collect the zero or more records produced by calling deserialization.deserialize(recordBytes, collector).
          3. All the records in the buffer must be flushed, and offset 32 updated into internal consumer state, as a single atomic operation synchronized on the checkpoint lock.
          4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 32, and not in-between.

          For the synchronization explained above, we do not need to expose another flush method to the user.

          For your use case, in which you want to write corrupt bytes to a storage, you would do that with a try-catch block in your implementation of deserialization.deserialize(bytes, collector). The only limitation here is that it must be a blocking call. Blocking call for this might be ok, depending on the frequency of corrupt messages. What do you think Haohui Mai?

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary for general use cases (especially the writeToExternalSources method; some users might just want to drop the corrupt record). First of all, I would still like to keep the interface to the minimal flatMap-like version proposed in FLINK-3679 : public interface NewDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { // user uses collector to buffer outputs void deserialize( byte [] message, OutputCollector<T> collector); } Something like the above (ignore the dummy name, we can think of a better one :-D). The way it would work is: 1. Consumer starts processing record with offset 32 (example). 2. Have an internal buffer in the consumer to collect the zero or more records produced by calling deserialization.deserialize(recordBytes, collector) . 3. All the records in the buffer must be flushed, and offset 32 updated into internal consumer state, as a single atomic operation synchronized on the checkpoint lock. 4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 32, and not in-between. For the synchronization explained above, we do not need to expose another flush method to the user. For your use case, in which you want to write corrupt bytes to a storage, you would do that with a try-catch block in your implementation of deserialization.deserialize(bytes, collector) . The only limitation here is that it must be a blocking call. Blocking call for this might be ok, depending on the frequency of corrupt messages. What do you think Haohui Mai ?
          Hide
          wheat9 Haohui Mai added a comment -

          The pseudo-code looks like the following:

          void invoke() {
            try {
              Tuple2<Key, Value> kv = deserialization.deserialze(bytes);
            } catch (Throwable v) {
              // Write the corrupted message to external sources (e.g., Kafka, HDFS)
              deserialization.writeToExternalSources(bytes);
            }
          }
          
          void checkpoint() {
            ...
            deserialization.flush();
          }
          

          Strictly speaking the DeserialzationSchema does not need to have persistent state (which IMO should be the right thing to do). However, it does require proper synchronizations when the consumer checkpoints. Does it make sense to you Tzu-Li (Gordon) Tai?

          Show
          wheat9 Haohui Mai added a comment - The pseudo-code looks like the following: void invoke() { try { Tuple2<Key, Value> kv = deserialization.deserialze(bytes); } catch (Throwable v) { // Write the corrupted message to external sources (e.g., Kafka, HDFS) deserialization.writeToExternalSources(bytes); } } void checkpoint() { ... deserialization.flush(); } Strictly speaking the DeserialzationSchema does not need to have persistent state (which IMO should be the right thing to do). However, it does require proper synchronizations when the consumer checkpoints. Does it make sense to you Tzu-Li (Gordon) Tai ?
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Regarding breaking user code when changing to a flatMap-like DeserializationSchema: I would actually prefer to have a new separate interface instead of breaking the original one. We could make use of FLINK-5704 to migrate to the new deserialization interface.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Regarding breaking user code when changing to a flatMap-like DeserializationSchema : I would actually prefer to have a new separate interface instead of breaking the original one. We could make use of FLINK-5704 to migrate to the new deserialization interface.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Hi Haohui Mai,

          I'm not quite sure why your case requires the DeserializationSchema to be stateful. Could you elaborate a bit more? What is the state of the DeserializationSchema you have in mind?

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Hi Haohui Mai , I'm not quite sure why your case requires the DeserializationSchema to be stateful. Could you elaborate a bit more? What is the state of the DeserializationSchema you have in mind?
          Hide
          wheat9 Haohui Mai added a comment -

          In general (1) sounds good to me. Taking a closer look it seems that it might require a stateful API instead of the traditional Collector APIs.

          We have a mission-critical use case that needs to write all corrupted messages to a persistent store so that these messages can be inspected and backfilled later. Ideally the DeserializationSchema could some state and probably will need to be synchronized when checkpoints happen.

          It might be more natural to deserialize messages as a subsequent stage of the consumer. Thoughts?

          Robert Metzger Tzu-Li (Gordon) Tai

          Show
          wheat9 Haohui Mai added a comment - In general (1) sounds good to me. Taking a closer look it seems that it might require a stateful API instead of the traditional Collector APIs. We have a mission-critical use case that needs to write all corrupted messages to a persistent store so that these messages can be inspected and backfilled later. Ideally the DeserializationSchema could some state and probably will need to be synchronized when checkpoints happen. It might be more natural to deserialize messages as a subsequent stage of the consumer. Thoughts? Robert Metzger Tzu-Li (Gordon) Tai
          Hide
          rmetzger Robert Metzger added a comment -

          I think fixing the issue through FLINK-3679 (e.g. option (1)) is the right way to go.

          Show
          rmetzger Robert Metzger added a comment - I think fixing the issue through FLINK-3679 (e.g. option (1)) is the right way to go.

            People

            • Assignee:
              wheat9 Haohui Mai
              Reporter:
              wheat9 Haohui Mai
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development