Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9673

Conditionally apply SMTs



    • New Feature
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 2.6.0
    • connect
    • None


      KAFKA-7052 ended up using IAE with a message, rather than NPE in the case of a SMT being applied to a record lacking a given field. It's still not possible to apply a SMT conditionally, which is what things like Debezium really need in order to apply transformations only to non-schema change events.

      rhauch suggested a mechanism to conditionally apply any SMT but was concerned about the possibility of a naming collision (assuming it was configured by a simple config)

      I'd like to propose something which would solve this problem without the possibility of such collisions. The idea is to have a higher-level condition, which applies an arbitrary transformation (or transformation chain) according to some predicate on the record.

      More concretely, it might be configured like this:

        transforms.conditionalExtract.type: Conditional
        transforms.conditionalExtract.transforms: extractInt
        transforms.conditionalExtract.transforms.extractInt.type: org.apache.kafka.connect.transforms.ExtractField$Key
        transforms.conditionalExtract.transforms.extractInt.field: c1
        transforms.conditionalExtract.condition: topic-matches:<someRegexHere>
      • The Conditional SMT is configured with its own list of transforms (transforms.conditionalExtract.transforms) to apply. This would work just like the top level transforms config, so subkeys can be used to configure these transforms in the usual way.
      • The condition config defines the predicate for when the transforms are applied to a record using a <condition-type>:<parameters> syntax

      We could initially support three condition types:

      topic-matches:<pattern> The transformation would be applied if the record's topic name matched the given regular expression pattern. For example, the following would apply the transformation on records being sent to any topic with a name beginning with "my-prefix-":

             transforms.conditionalExtract.condition: topic-matches:my-prefix-.*

      has-header:<header-name> The transformation would be applied if the record had at least one header with the given name. For example, the following will apply the transformation on records with at least one header with the name "my-header":

             transforms.conditionalExtract.condition: has-header:my-header

      not:<condition-name> This would negate the result of another named condition using the condition config prefix. For example, the following will apply the transformation on records which lack any header with the name my-header:

            transforms.conditionalExtract.condition: not:hasMyHeader
            transforms.conditionalExtract.condition.hasMyHeader: has-header:my-header

      I foresee one implementation concern with this approach, which is that currently Transformation has to return a fixed ConfigDef, and this proposal would require something more flexible in order to allow the config parameters to depend on the listed transform aliases (and similarly for named predicate used for the not: predicate). I think this could be done by adding a default method to Transformation for getting the ConfigDef given the config, for example.

      Obviously this would require a KIP, but before I spend any more time on this I'd be interested in your thoughts rhauch, rmoff, gunnar.morling.


        Issue Links



              tombentley Tom Bentley
              tombentley Tom Bentley
              Konstantine Karantasis Konstantine Karantasis
              0 Vote for this issue
              2 Start watching this issue