Details
Description
With the following Single Message Transform:
"transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.ExtractId.field":"id"
Kafka Connect errors with :
java.lang.NullPointerException at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
There should be a better error message here, identifying the reason for the NPE.
Version: Confluent Platform 4.1.1
Attachments
Issue Links
- is related to
-
KAFKA-8523 InsertField transformation fails when encountering tombstone event
- Resolved
- links to
Activity
I'm wondering: is it solely a question of having a more meaningful exception, or should rather be null returned in this case. Seems like one might want either, depending on the use case.
Looking into this, and it's an interesting case. The problem arises when the SMT gets one of the schema change events from the Debezium MySQL connector which don't have the primary key structure as for instance your customers table. So I think actually it's not your intention to apply the SMT to these messages to begin with.
Question is how to deal with this case; I could see these options when encountering a message which doesn't have the given field:
- raise a more meaningful exception than NPE (pretty disruptive)
- return null
- leave the key/value unchanged
I think for your use case, the last option is the most useful one. But returning null might also make sense in others. This might require an option perhaps?
Thanks gunnar.morling, this put me on the correct tracks - in this particular case to get this to work I set
"include.schema.changes": "false",
In general though, wouldn't a more descriptive error (e.g. "ExtractField : field 'id' not found in Key") be more useful to users? Why would this be more disruptive? It would certainly be easier for troubleshooting
Hey rmoff, what I meant to say: raising any exception is rather disruptive here. We surely could improve the exception message, but I don't think it would help you really, as the connector still would be in FAILED state when encountering a schema change event. I think your case would be best addressed if the SMT just left the key/value as-is, in case the extract field operation cannot be applied.
Taking a step back, it could also indicate a more general short-coming of Connect: SMTs applied to a source connector such as Debezium are applied to all topics, whereas it actually may be desirable to apply them only to a subset of topics produced by the connector (in case of Debezium, actual change data topics, while schema change and heartbeat topics shouldn't be targetted). Perhaps worth a KIP? tombentley, WDYT?
I agree that throwing an exception here can be disruptive. You've listed three possible actions, (exception, null, leave alone). I don't think there is a right answer here. We currently throw an exception, which may be a pain for some users. But switching this to, say, leave the message alone, would be a breaking change and might not work for other users. With no right solution, it feels like this should be configurable.
Yes, that'd certainly work for me and I'd be happy to send a PR. Perhaps behavior.on.non.existant.field = (fail|drop|passon)?
gunnarmorling commented on pull request #8059: KAFKA-7052 Adding option to ExtractField SMT for controlling behavior…
URL: https://github.com/apache/kafka/pull/8059
… in case of non-existent fields
https://issues.apache.org/jira/browse/KAFKA-7052
*More detailed description of your change
n/a
*Summary of testing strategy (including rationale)
Added JUnit test
-
-
- Committer Checklist (excluded from commit message)
-
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
@rhauch, hey, that's a quick first attempt for making the `ExtractField` SMT more flexible when it comes to encountering a record that doesn't contain the specified field. It's a common situation for connectors like Debezium which produce different "kinds" of records/topics; in our case e.g. actual change event topics and meta-topics such as TX data or schema history. One might want to apply the `ExtractField` SMT to the CDC records but not to those others. Also see KAFKA-7052(https://issues.apache.org/jira/browse/KAFKA-7052) for some backgrounds.
The proposal is to add a new option `behavior.on.non.existent.field` to the SMT which makes the behavior configurable. Its supported values are:
- fail: raise an exception (default for records with schema)
- return-null: return null (default for records without schema)
- pass-on: pass on the unmodified original record
I did a quick implementation of that proposal to foster feedback. Happy to adjust and expand as needed, e.g. to adjust with existing naming patterns for the option and/or its values as well as docs (not sure where that'd go). Thanks!
CC @rmoff, @big-andy-coates.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Thanks for the discussion, everyone. As soon as we introduce a configuration option on a Connect-provided SMT, we need a KIP and we can't backport the change.
I would suggestion the following:
- One PR to improve the error message per rmoff's earlier comment (e.g., "ExtractField : field 'id' not found in Key") but otherwise not change the behavior. Or, we make the message better and we decide that "skip" is an appropriate fix rather than an NPE; this may need more discussion, but I think NPE is a bad UX and probably rarely useful. This would be able to be merged on trunk and backported to 2-3 branches (standard practice).
- Create a new Jira issue and small KIP and another PR to add the proposed ExtractField configuration property, such as "behavior.on.non.existant.field = (fail|drop|skip)". Note I'm suggesting using "skip" rather than "passon" to keep things simple. gunnar.morling's PR would apply to this new Jira issue and KIP rather than to this issue.
- Create a new Jira issue and KIP to define an optional topic filter property for each transformation that would specify a regex pattern matching the topics to which the SMT should apply, and defaulting to ".*" to maintain the current behavior. I'm not sure how feasible this will be to ensure it never clashes with SMT-specific properties, but it's worth investigating.
Important: We've already missed the KIP acceptance deadline for AK 2.5, so the earliest #2 and #3 could appear is AK 2.6.
WDYT?
On second though, I'm not sure we can change the behavior without risking backward-incompatible changes.
First, the current behavior when the SMT can't be applied is to fail with an NPE. We try to use meaningful exceptions with useful error messages, and the lack of any code to check for this situation likely means this case was simply not considered and throwing an NPE is unintentional. This is an argument for changing the behavior to skip any record for which the specified field is not found.
Second, it's probably not practical for users to rely upon this existing NPE behavior when used with a source connector, since that would leave the connector in a failed state without advancing offsets. Essentially, the connector would be stuck and unable to continue unless the configuration is changed.
However, if someone were to use this SMT with a sink connector and use the DLQ functionality, they might be relying upon the NPE to signal that the record should go to the DLQ. If we were to change the behavior, the record would no longer go to the DLQ and instead would get sent to any subsequent transformation and ultimately to the sink connector.
Therefore, changing to have the SMT skip any record for which the field is not found is technically not a backward compatible change, and this would require a KIP or, better yet, require introducing a new configuration property (as mentioned in my previous comment) that would default to "fail" to maintain backward compatibility.
Thoughts?
Thanks for commenting, rhauch! My intention was to actually keep the current behavior by means of the default setting of the new option:
- "fail" for records with a schema (it's raising an IllegalArgumentException but could be NPE of course if the exact exception type is a concern; I think it shouldn't, as the original NPE really is a weakness of the existing implementation that shouldn't be relied upon
- "return-null" for records without schema
I.e. without any explicit setting, the behavior will be exactly be the same as today (ignoring the changed exception type). That's why I didn't assume that'd need a KIP, but as per what you're saying, any new option mandates a KIP?
Any change to an API requires a KIP, even if it's a backward compatible change (which most are).
"fail" for records with a schema (it's raising an IllegalArgumentException but could be NPE of course if the exact exception type is a concern; I think it shouldn't, as the original NPE really is a weakness of the existing implementation that shouldn't be relied upon
I'm not sure that the type of exception matters, and IAE would be more clear.
I agree that even just improving the error message would be a good first-step here.
rhauch commented on pull request #8059: KAFKA-7052 Avoiding NPE in ExtractField SMT in case of non-existent fields
URL: https://github.com/apache/kafka/pull/8059
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Tested on Debezium 0.8 (AK 1.1.0) and Debezium 0.7.4 (AK 1.0.0) docker images.
Connector:
Source schema + data in MySQL: