KAFKA-7052 ended up using IAE with a message, rather than NPE in the case of a SMT being applied to a record lacking a given field. It's still not possible to apply a SMT conditionally, which is what things like Debezium really need in order to apply transformations only to non-schema change events.
Randall Hauch suggested a mechanism to conditionally apply any SMT but was concerned about the possibility of a naming collision (assuming it was configured by a simple config)
I'd like to propose something which would solve this problem without the possibility of such collisions. The idea is to have a higher-level condition, which applies an arbitrary transformation (or transformation chain) according to some predicate on the record.
More concretely, it might be configured like this:
- The Conditional SMT is configured with its own list of transforms (transforms.conditionalExtract.transforms) to apply. This would work just like the top level transforms config, so subkeys can be used to configure these transforms in the usual way.
- The condition config defines the predicate for when the transforms are applied to a record using a <condition-type>:<parameters> syntax
We could initially support three condition types:
topic-matches:<pattern> The transformation would be applied if the record's topic name matched the given regular expression pattern. For example, the following would apply the transformation on records being sent to any topic with a name beginning with "my-prefix-":
has-header:<header-name> The transformation would be applied if the record had at least one header with the given name. For example, the following will apply the transformation on records with at least one header with the name "my-header":
not:<condition-name> This would negate the result of another named condition using the condition config prefix. For example, the following will apply the transformation on records which lack any header with the name my-header:
I foresee one implementation concern with this approach, which is that currently Transformation has to return a fixed ConfigDef, and this proposal would require something more flexible in order to allow the config parameters to depend on the listed transform aliases (and similarly for named predicate used for the not: predicate). I think this could be done by adding a default method to Transformation for getting the ConfigDef given the config, for example.