Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4691

ProducerInterceptor.onSend() is called after key and value are serialized

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 0.10.1.1
    • None
    • clients, streams
    • None

    Description

      According to the JavaDoc (https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html) " This is called from KafkaProducer.send(ProducerRecord) and KafkaProducer.send(ProducerRecord, Callback) methods, before key and value get serialized and partition is assigned (if partition is not specified in ProducerRecord)".

      Although when using this with Kafka Streams (StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)) the key and value contained in the record object are already serialized.

      As you can see from the screenshot, the serialization is performed inside RecordCollectionImpl.send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
      StreamPartitioner<K, V> partitioner), effectively before calling the send method of the producer which will trigger the interceptor.

      This makes it unable to perform any kind of operation involving the key or value of the message, unless at least performing an additional deserialization.

      Attachments

        Activity

          I am not sure, if we can fix this soon. The problem is, that Streams uses a single `KafkaProducer` for all output topics. All those output topics might have different types for keys and values while `KafkaProducer` can only be configures to use a single serializer; one for the key and one for the value. Thus, Kafka Streams configures the producer to use byte[] as key and value type and use the appropriate serializers per output topic before handing the record to the producer. Right now, I would rather close this as "won't fix". \cc guozhang

          mjsax Matthias J. Sax added a comment - I am not sure, if we can fix this soon. The problem is, that Streams uses a single `KafkaProducer` for all output topics. All those output topics might have different types for keys and values while `KafkaProducer` can only be configures to use a single serializer; one for the key and one for the value. Thus, Kafka Streams configures the producer to use byte[] as key and value type and use the appropriate serializers per output topic before handing the record to the producer. Right now, I would rather close this as "won't fix". \cc guozhang
          francescolemma Francesco Lemma added a comment - - edited

          Thanks for commenting Matthias. The fact that there's one single producer which uses byte[] for key and value isn't necessary a limitation. The problem is that the interceptor is called after the serialization happens. If the interceptor was called before (as it should according to the JavaDoc), then it could manipulate the message (because the message it's not yet serialized) and, regardless of the type, the manipulated message will be then serialized to a byte[] by the RecordCollector and will be compatible with the single Kafka Streams producer. Am I missing something here? I think the problem is where the interceptors are triggered.

          francescolemma Francesco Lemma added a comment - - edited Thanks for commenting Matthias. The fact that there's one single producer which uses byte[] for key and value isn't necessary a limitation. The problem is that the interceptor is called after the serialization happens. If the interceptor was called before (as it should according to the JavaDoc), then it could manipulate the message (because the message it's not yet serialized) and, regardless of the type, the manipulated message will be then serialized to a byte[] by the RecordCollector and will be compatible with the single Kafka Streams producer. Am I missing something here? I think the problem is where the interceptors are triggered.
          mjsax Matthias J. Sax added a comment - - edited

          Sorry for not being precise enough. For KafkaProducer interceptors are called before the producer does any serialization. However, in Streams, the producer never sees unserialized data, because before the producer's send() method is called, Streams serialized the data already and gives bytes array to the producer. Note, that RecordCollectorImpl is part of Streams and not of KafkaProducer (it just uses KafkaProducer). Btw: For this case, the producer skips any serialization step because the data is already of type byte[].

          In Streams, we cannot give unserialized data to the KafkaProducer because the producer can only have a single serializer. Thus, as we need to handle more than one different key-types, KakfaProducer handle them as it only can serialize a single type. The solution is to do the serialization before giving the data to the producer.

          mjsax Matthias J. Sax added a comment - - edited Sorry for not being precise enough. For KafkaProducer interceptors are called before the producer does any serialization. However, in Streams, the producer never sees unserialized data, because before the producer's send() method is called, Streams serialized the data already and gives bytes array to the producer. Note, that RecordCollectorImpl is part of Streams and not of KafkaProducer (it just uses KafkaProducer ). Btw: For this case, the producer skips any serialization step because the data is already of type byte[] . In Streams, we cannot give unserialized data to the KafkaProducer because the producer can only have a single serializer. Thus, as we need to handle more than one different key-types, KakfaProducer handle them as it only can serialize a single type. The solution is to do the serialization before giving the data to the producer.

          Btw: we got "dynamic routing" (ie, computing the output topic name at runtime) already multiple times, and we might want to add this at some point. But this is a new feature and IMHO there is no bug.

          mjsax Matthias J. Sax added a comment - Btw: we got "dynamic routing" (ie, computing the output topic name at runtime) already multiple times, and we might want to add this at some point. But this is a new feature and IMHO there is no bug.

          Thanks Matthias. I perfectly understand what you are saying and I agree 100%. Maybe I'm not expressing myself clearly here. What I'm proposing here is to extract the call to the interceptors from the KafkaProducer and put it in the RecordCollectorImpl. To avoid affecting non Streams implementation there could be an overloaded method in the KafkaProducer similar to this:

          //Overloaded method. This will be called by the RecordCollectorImpl with triggerInterceptors = false
          public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback, boolean triggerInterceptors) {
              // intercept the record, which can be potentially modified; this method does not throw exceptions
              ProducerRecord<K, V> interceptedRecord = this.interceptors == null || !triggerInterceptors ? record : this.interceptors.onSend(record);
              return doSend(interceptedRecord, callback);
          }
          
          //This method has the original signature. All calls to this KafkaProducer send method will ideally keep calling this method
          public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
              send(record, callback, true);
          }
          

          Then the RecordCollectorImpl.send(...) method could potentially be modified as follows:

              @Override
              public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
                                      StreamPartitioner<K, V> partitioner) {
          
                  //The visibility of interceptors would need to be changed or other way to expose them should be implemented.
                  //This line has been added from the KafkaProducer
                  ProducerRecord<K, V> interceptedRecord = this.producer.interceptors == null || !triggerInterceptors ? record : this.producer.interceptors.onSend(record);
          
                  byte[] keyBytes = keySerializer.serialize(interceptedRecord.topic(), interceptedRecord.key());
                  byte[] valBytes = valueSerializer.serialize(interceptedRecord.topic(), interceptedRecord.value());
                  Integer partition = interceptedRecord.partition();
                  if (partition == null && partitioner != null) {
                      List<PartitionInfo> partitions = this.producer.partitionsFor(interceptedRecord.topic());
                      if (partitions != null && partitions.size() > 0)
                          partition = partitioner.partition(interceptedRecord.key(), interceptedRecord.value(), partitions.size());
                  }
          
                  ProducerRecord<byte[], byte[]> serializedRecord =
                          new ProducerRecord<>(interceptedRecord.topic(), partition, interceptedRecord.timestamp(), keyBytes, valBytes);
                  final String topic = serializedRecord.topic();
          
                  .......
                  .......
                  .......
          
              }
          

          I would be more than willing to make the changes and send a pull request if at all it makes sense to you guys. I believe that the ProducerInterceptor is a very valuable functionality and this issue makes it realistically not very useful within KafkaStreams.

          francescolemma Francesco Lemma added a comment - Thanks Matthias. I perfectly understand what you are saying and I agree 100%. Maybe I'm not expressing myself clearly here. What I'm proposing here is to extract the call to the interceptors from the KafkaProducer and put it in the RecordCollectorImpl . To avoid affecting non Streams implementation there could be an overloaded method in the KafkaProducer similar to this: //Overloaded method. This will be called by the RecordCollectorImpl with triggerInterceptors = false public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback, boolean triggerInterceptors) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this .interceptors == null || !triggerInterceptors ? record : this .interceptors.onSend(record); return doSend(interceptedRecord, callback); } //This method has the original signature. All calls to this KafkaProducer send method will ideally keep calling this method public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { send(record, callback, true ); } Then the RecordCollectorImpl.send(...) method could potentially be modified as follows: @Override public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<K, V> partitioner) { //The visibility of interceptors would need to be changed or other way to expose them should be implemented. //This line has been added from the KafkaProducer ProducerRecord<K, V> interceptedRecord = this .producer.interceptors == null || !triggerInterceptors ? record : this .producer.interceptors.onSend(record); byte [] keyBytes = keySerializer.serialize(interceptedRecord.topic(), interceptedRecord.key()); byte [] valBytes = valueSerializer.serialize(interceptedRecord.topic(), interceptedRecord.value()); Integer partition = interceptedRecord.partition(); if (partition == null && partitioner != null ) { List<PartitionInfo> partitions = this .producer.partitionsFor(interceptedRecord.topic()); if (partitions != null && partitions.size() > 0) partition = partitioner.partition(interceptedRecord.key(), interceptedRecord.value(), partitions.size()); } ProducerRecord< byte [], byte []> serializedRecord = new ProducerRecord<>(interceptedRecord.topic(), partition, interceptedRecord.timestamp(), keyBytes, valBytes); final String topic = serializedRecord.topic(); ....... ....... ....... } I would be more than willing to make the changes and send a pull request if at all it makes sense to you guys. I believe that the ProducerInterceptor is a very valuable functionality and this issue makes it realistically not very useful within KafkaStreams.
          francescolemma Francesco Lemma added a comment - - edited

          mjsax This issue is not related to dynamic routing. Even though the problem I'm trying to solve is dynamic routing, effectively the ProducerInterceptor cannot be used within KafkaStreams to perform operations which involve the key and/or the value of the message (without the cost at least of an additional deserialization. I say "at least" because there's potentially also an additional serialization in case the intercepted record needs to carry a modified version of the message).

          francescolemma Francesco Lemma added a comment - - edited mjsax This issue is not related to dynamic routing. Even though the problem I'm trying to solve is dynamic routing, effectively the ProducerInterceptor cannot be used within KafkaStreams to perform operations which involve the key and/or the value of the message (without the cost at least of an additional deserialization. I say "at least" because there's potentially also an additional serialization in case the intercepted record needs to carry a modified version of the message).

          I see. Understood now. Seems to be a fair change. But I think, we should do it differently to avoid the producer API change. Because Streams does forward producer config provided in StreamsConfig we can just omit forwarding interceptor configuration – thus the KafkaProducer will not have any interceptors configured in the first place – and Streams just take the interceptors from StreamsConfig to call them.

          But I am not sure, if there are any other things to consider, if Streams does the interceptor call instead of the producer. \cc apovzner

          mjsax Matthias J. Sax added a comment - I see. Understood now. Seems to be a fair change. But I think, we should do it differently to avoid the producer API change. Because Streams does forward producer config provided in StreamsConfig we can just omit forwarding interceptor configuration – thus the KafkaProducer will not have any interceptors configured in the first place – and Streams just take the interceptors from StreamsConfig to call them. But I am not sure, if there are any other things to consider, if Streams does the interceptor call instead of the producer. \cc apovzner
          apovzner Anna Povzner added a comment -

          I agree with mjsax about not changing KafkaProducer API. Instead, not have any producer interceptors configured, if we do that change and let Streams intercept.

          In the case of completely disabling the producer interceptor, and implementing this functionality in Streams, RecordCollectorImpl.send() should also call interceptor's onAcknowledgement(), in the similar situations as KafkaProducer does. E.g. if send() fails, onAcknowledgement() should be called with mostly empty RecordMetadata but with topic and partition set. Also, onAcknowledgement() should be called from the onCompletion in RecordCollectorImpl.send(). It looks like all of that could be implemented in RecordCollectorImpl.send().

          apovzner Anna Povzner added a comment - I agree with mjsax about not changing KafkaProducer API. Instead, not have any producer interceptors configured, if we do that change and let Streams intercept. In the case of completely disabling the producer interceptor, and implementing this functionality in Streams, RecordCollectorImpl.send() should also call interceptor's onAcknowledgement(), in the similar situations as KafkaProducer does. E.g. if send() fails, onAcknowledgement() should be called with mostly empty RecordMetadata but with topic and partition set. Also, onAcknowledgement() should be called from the onCompletion in RecordCollectorImpl.send(). It looks like all of that could be implemented in RecordCollectorImpl.send().

          Thanks mjsax and apovzner. Your points make complete sense. According to the "Contributing Code Changes" page I should assign this ticket to me before starting working on it, although I don't think I have the permission to do so.
          Can you please assign it to me?

          francescolemma Francesco Lemma added a comment - Thanks mjsax and apovzner . Your points make complete sense. According to the "Contributing Code Changes" page I should assign this ticket to me before starting working on it, although I don't think I have the permission to do so. Can you please assign it to me?

          You should now have permission to assign to yourself. I ask an committer to add you to the list of contributors. (in general, you can request this via dev mailing list).

          mjsax Matthias J. Sax added a comment - You should now have permission to assign to yourself. I ask an committer to add you to the list of contributors. (in general, you can request this via dev mailing list).
          bbejeck Bill Bejeck added a comment -

          francescolemma are you still interested in doing this JIRA?

          bbejeck Bill Bejeck added a comment - francescolemma are you still interested in doing this JIRA?

          People

            francescolemma Francesco Lemma
            francescolemma Francesco Lemma
            Anna Povzner Anna Povzner
            Votes:
            1 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated: