Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7273

Converters should have access to headers.

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 2.4.0
    • connect
    • None

    Description

      I found myself wanting to build a converter that stored additional type information within headers. The converter interface does not allow a developer to access to the headers in a Converter. I'm not suggesting that we change the method for serializing them, rather that org.apache.kafka.connect.header.Headers be passed in for fromConnectData and toConnectData. For example something like this.

      import org.apache.kafka.connect.data.Schema;
      import org.apache.kafka.connect.data.SchemaAndValue;
      import org.apache.kafka.connect.header.Headers;
      import org.apache.kafka.connect.storage.Converter;
      
      public interface Converter {
        default byte[] fromConnectData(String topic, Headers headers, Schema schema, Object object) {
          return fromConnectData(topic, schema, object);
        }
      
        default SchemaAndValue toConnectData(String topic, Headers headers, byte[] payload) {
          return toConnectData(topic, payload);
        }
      
        void configure(Map<String, ?> var1, boolean var2);
      
        byte[] fromConnectData(String var1, Schema var2, Object var3);
      
        SchemaAndValue toConnectData(String var1, byte[] var2);
      }
      
      

      This would be a similar approach to what was already done with ExtendedDeserializer and ExtendedSerializer in the Kafka client.

      Attachments

        Issue Links

          Activity

            People

              jcustenborder Jeremy Custenborder
              jcustenborder Jeremy Custenborder
              Randall Hauch Randall Hauch
              Votes:
              2 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: