Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13278

Deserialization behavior of the Fetcher class does not match up with API contract of the Deserializer interface

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.6.0
    • None
    • clients, documentation
    • None

    Description

      The documentation of the

      org.apache.kafka.common.serialization.Deserializer

      interface states that implementations have to expect null byte-arrays and should handle them in a meaningful way.

      However, at least in the kafka client it seems to be impossible to actually get a null value into a deserializer because the class

      org.apache.kafka.clients.consumer.internals.Fetcher

      does not call the registered deserializer in case of a null value.

      private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
                                                   RecordBatch batch,
                                                   Record record) {
              try {
                  long offset = record.offset();
                  long timestamp = record.timestamp();
                  Optional<Integer> leaderEpoch = maybeLeaderEpoch(batch.partitionLeaderEpoch());
                  TimestampType timestampType = batch.timestampType();
                  Headers headers = new RecordHeaders(record.headers());
                  ByteBuffer keyBytes = record.key();
                  byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
                  K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
                  ByteBuffer valueBytes = record.value();
                  byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
                  V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
                  return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
                                              timestamp, timestampType, record.checksumOrNull(),
                                              keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
                                              valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
                                              key, value, headers, leaderEpoch);
              } catch (RuntimeException e) {
                  throw new SerializationException("Error deserializing key/value for partition " + partition +
                          " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
              }
          }
      

      I implemented an ErrorHandlingDeserializer which I use to wrap the actual deserializers and which records the result (value or exception) in a container object.

      /**
       * Handles exceptions during de-serializations thrown by a delegate {@link Deserializer}.
       *
       * @param <T> type of the deserialized object
       */
      final class ErrorHandlingDeserializer<T> implements Deserializer<ReadResult<T>> {
      
        private final Deserializer<Envelope<T>> delegate;
      
        private ErrorHandlingDeserializer(Deserializer<Envelope<T>> delegate) {
          this.delegate = requireNonNull(delegate);
        }
      
        static <T> ErrorHandlingDeserializer<T> wrap(Deserializer<Envelope<T>> delegate) {
          return new ErrorHandlingDeserializer<>(delegate);
        }
      
        @Override
        public ReadResult<T> deserialize(String topic, @Nullable byte[] data) {
          try {
            return ReadResult.successful(delegate.deserialize(topic, data));
          } catch (Exception e) {
            return ReadResult.failed(e);
          }
        }
      }
      

      This deserializer cannot produce a null value. However, because of the Fetcher behavior I still have to check for null values in the consumer records at every usage and additionally I also have to check for a null value inside the ReadResult container class, because the Deserializer API says so and I have no guarantee that the Fetcher behavior will never change.

      In my opinion this behavior is a bug, because everyone implementing a Deserializer would expect to actually receive null values (for example in case of deletions). There should either be a guarantee on the client side that Deserializers always receive null values or that they never receive null values.

      Attachments

        Activity

          People

            Unassigned Unassigned
            julian.reichinger Julian Reichinger
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: