Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-3209

Support single message transforms in Kafka Connect

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.10.2.0
    • Component/s: KafkaConnect
    • Labels:

      Description

      Users should be able to perform light transformations on messages between a connector and Kafka. This is needed because some transformations must be performed before the data hits Kafka (e.g. filtering certain types of events or PII filtering). It's also useful for very light, single-message modifications that are easier to perform inline with the data import/export.

        Issue Links

          Activity

          Hide
          snisarg Nisarg Shah added a comment -

          Hey, I'm looking to contribute to open source and want to take this up.

          Show
          snisarg Nisarg Shah added a comment - Hey, I'm looking to contribute to open source and want to take this up.
          Hide
          gwenshap Gwen Shapira added a comment -

          Thank you Nisarg. I believe Ewen Cheslack-Postava already started working on this one...

          Show
          gwenshap Gwen Shapira added a comment - Thank you Nisarg. I believe Ewen Cheslack-Postava already started working on this one...
          Hide
          ewencp Ewen Cheslack-Postava added a comment -

          Nisarg Shah I haven't started work in earnest. This JIRA may not end up being particularly complicated code-wise, but given that it will be new public API, the impact it can have, and that we want to make sure all transformations we want to support will work with the implementation, it'll need a KIP proposal and discussion. (Though an initial prototype patch might also help drive that discussion.)

          If you're interested in picking this up, I'd be happy to guide you through the process of writing up the KIP.

          Show
          ewencp Ewen Cheslack-Postava added a comment - Nisarg Shah I haven't started work in earnest. This JIRA may not end up being particularly complicated code-wise, but given that it will be new public API, the impact it can have, and that we want to make sure all transformations we want to support will work with the implementation, it'll need a KIP proposal and discussion. (Though an initial prototype patch might also help drive that discussion.) If you're interested in picking this up, I'd be happy to guide you through the process of writing up the KIP.
          Hide
          snisarg Nisarg Shah added a comment -

          That does sound good. I realise it is not insanely complicated, but since I haven't done much in terms of open source, I thought I'll start with something like this. I also think this is useful nonetheless.

          Show
          snisarg Nisarg Shah added a comment - That does sound good. I realise it is not insanely complicated, but since I haven't done much in terms of open source, I thought I'll start with something like this. I also think this is useful nonetheless.
          Hide
          Skandragon Michael Graff added a comment -

          We have a use case which is in effect "take a single Kafka input message and produce 1 or more output items" – I understand people refer to this as a "flatMap" or something like that.

          Specifically, our use case is to get a chunk formatted like this:

          Header-1: value1
          Header-2: value2

          record1
          record2
          record3
          ...
          records

          Our transformation is to take some header fields and merge them into a record, then emit that record for all (or some) records in the message.

          Show
          Skandragon Michael Graff added a comment - We have a use case which is in effect "take a single Kafka input message and produce 1 or more output items" – I understand people refer to this as a "flatMap" or something like that. Specifically, our use case is to get a chunk formatted like this: Header-1: value1 Header-2: value2 record1 record2 record3 ... records Our transformation is to take some header fields and merge them into a record, then emit that record for all (or some) records in the message.
          Hide
          ewencp Ewen Cheslack-Postava added a comment -

          To help clarify Michael Graff's comment a bit, the idea is that the records are going to be small compared to the headers. This means that the approach we might normally suggest – doing the flatMap transformation with an application or stream processor, storing that data back to Kafka, then using Connect to store the data to another system – will have very high overhead.

          Whereas most of the message transforms we've discussed so far are either simple map() or filter() transformations, this is a case where we might want to generate multiple output messages from a single input message. The API for supporting this is obviously straightforward – just support returning a list of messages from the transformation instead of a single message. However, I think the main challenge is that message offsets either aren't unique anymore or we'd need to extend the concept of offset to account for "sub-messages".

          Show
          ewencp Ewen Cheslack-Postava added a comment - To help clarify Michael Graff 's comment a bit, the idea is that the records are going to be small compared to the headers. This means that the approach we might normally suggest – doing the flatMap transformation with an application or stream processor, storing that data back to Kafka, then using Connect to store the data to another system – will have very high overhead. Whereas most of the message transforms we've discussed so far are either simple map() or filter() transformations, this is a case where we might want to generate multiple output messages from a single input message. The API for supporting this is obviously straightforward – just support returning a list of messages from the transformation instead of a single message. However, I think the main challenge is that message offsets either aren't unique anymore or we'd need to extend the concept of offset to account for "sub-messages".
          Hide
          pablocasares Pablo Casares added a comment -

          Hey, I'm interested in produce keys based on json fields. I was reading https://groups.google.com/forum/#!topic/confluent-platform/aVaqBtMiKkY and they suggest this improvement. I think that this can help me. I can contribute to this improvement if you want. What is the current status of this task?.

          Regards!

          Pablo

          Show
          pablocasares Pablo Casares added a comment - Hey, I'm interested in produce keys based on json fields. I was reading https://groups.google.com/forum/#!topic/confluent-platform/aVaqBtMiKkY and they suggest this improvement. I think that this can help me. I can contribute to this improvement if you want. What is the current status of this task?. Regards! Pablo
          Hide
          ewencp Ewen Cheslack-Postava added a comment -

          Pablo Casares I don't think anybody is actively working on this – Nisarg had started but I'm not sure if they have dropped work on it for now since we haven't seen anything for awhile. First step would be to sketch out a design and put together a KIP. We'll want to make sure the use cases and how the proposed interfaces work for them is clearly documented. Especially important will be discussion around configuration – how will transforms be configured in the Connector configs, possibly allowing for multiple transformations while keeping configuration easy? Additionally, if we are going to ship anything with Kafka to provide very commonly needed transformations (dropping & renaming fields, flattening hierarchical structures, filters), we'll probably want to include a full description of that set in the KIP as well. (Alternatively, we might not ship any, but then they obviously aren't as useful out of the box – like connectors and converters, you'll need to pull in separate jars for transforms). I imagine the implementation will be pretty straightforward once we've got it specced out.

          Show
          ewencp Ewen Cheslack-Postava added a comment - Pablo Casares I don't think anybody is actively working on this – Nisarg had started but I'm not sure if they have dropped work on it for now since we haven't seen anything for awhile. First step would be to sketch out a design and put together a KIP. We'll want to make sure the use cases and how the proposed interfaces work for them is clearly documented. Especially important will be discussion around configuration – how will transforms be configured in the Connector configs, possibly allowing for multiple transformations while keeping configuration easy? Additionally, if we are going to ship anything with Kafka to provide very commonly needed transformations (dropping & renaming fields, flattening hierarchical structures, filters), we'll probably want to include a full description of that set in the KIP as well. (Alternatively, we might not ship any, but then they obviously aren't as useful out of the box – like connectors and converters, you'll need to pull in separate jars for transforms). I imagine the implementation will be pretty straightforward once we've got it specced out.
          Hide
          snisarg Nisarg Shah added a comment -

          I'm still working on it. Got a little busy. I thought I'll get a working prototype ready so that it can be demonstrated and it's easier to convey how it'll work.
          I have it almost ready, working out small kinks of sending properties down the transformers behind the scenes. I will be sending out mails to the dev channel soon.

          Show
          snisarg Nisarg Shah added a comment - I'm still working on it. Got a little busy. I thought I'll get a working prototype ready so that it can be demonstrated and it's easier to convey how it'll work. I have it almost ready, working out small kinks of sending properties down the transformers behind the scenes. I will be sending out mails to the dev channel soon.
          Show
          snisarg Nisarg Shah added a comment - Please give your comments and suggestions here https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Add+Kafka+Connect+Transformers+to+allow+transformations+to+messages
          Hide
          jjchorrobe Juan Chorro added a comment -

          Hi! I'm interested in this feature and I would like to know if anybody is working in this issue.

          Show
          jjchorrobe Juan Chorro added a comment - Hi! I'm interested in this feature and I would like to know if anybody is working in this issue.
          Hide
          snisarg Nisarg Shah added a comment -

          Hey Juan. I have started working on this, have only been busy lately and will start again within a few weeks. You could help though. Have a look at the KIP (link above) and let's discuss suggestions and implementations.

          Show
          snisarg Nisarg Shah added a comment - Hey Juan. I have started working on this, have only been busy lately and will start again within a few weeks. You could help though. Have a look at the KIP (link above) and let's discuss suggestions and implementations.
          Hide
          shikhar Shikhar Bhushan added a comment -

          Nisarg Shah and Juan Chorro, I revived the discussion thread and I'd welcome your thoughts on there about this proposal: https://cwiki.apache.org/confluence/display/KAFKA/Connect+Transforms+-+Proposed+Design

          Show
          shikhar Shikhar Bhushan added a comment - Nisarg Shah and Juan Chorro , I revived the discussion thread and I'd welcome your thoughts on there about this proposal: https://cwiki.apache.org/confluence/display/KAFKA/Connect+Transforms+-+Proposed+Design
          Hide
          snisarg Nisarg Shah added a comment -

          Shikhar Bhushan the proposal looks details and flexible in terms of what it can do. I see you've assigned this to yourself, would you also like to update KIP-66?

          Show
          snisarg Nisarg Shah added a comment - Shikhar Bhushan the proposal looks details and flexible in terms of what it can do. I see you've assigned this to yourself, would you also like to update KIP-66?
          Hide
          shikhar Shikhar Bhushan added a comment -

          Thanks Nisarg Shah. I self-assigned it as I don't believe you were actively working on it and the ticket was unassigned, but I'd be happy to collaborate if you have time.

          Yes, I think we should continue the work by updating KIP-66. I'm working on drafting the proposal into KIP form and I'll send a ML update when it's ready.

          Show
          shikhar Shikhar Bhushan added a comment - Thanks Nisarg Shah . I self-assigned it as I don't believe you were actively working on it and the ticket was unassigned, but I'd be happy to collaborate if you have time. Yes, I think we should continue the work by updating KIP-66. I'm working on drafting the proposal into KIP form and I'll send a ML update when it's ready.
          Hide
          snisarg Nisarg Shah added a comment -

          Yeah sorry about that, I got caught up with school work. You would have a better depth of understanding of this problem as I haven't used this part in a professional environment. I would gladly collaborate with you.

          Show
          snisarg Nisarg Shah added a comment - Yeah sorry about that, I got caught up with school work. You would have a better depth of understanding of this problem as I haven't used this part in a professional environment. I would gladly collaborate with you.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user shikhar opened a pull request:

          https://github.com/apache/kafka/pull/2299

          KAFKA-3209: KIP-66: single message transforms

          WIP particularly around testing

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/shikhar/kafka smt-2017

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/kafka/pull/2299.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2299


          commit 1178670f36d8fbdf5cbdbb2728ace7bf4f0e7300
          Author: Shikhar Bhushan <shikhar@confluent.io>
          Date: 2017-01-03T19:21:17Z

          KAFKA-3209: KIP-66: single message transforms


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user shikhar opened a pull request: https://github.com/apache/kafka/pull/2299 KAFKA-3209 : KIP-66: single message transforms WIP particularly around testing You can merge this pull request into a Git repository by running: $ git pull https://github.com/shikhar/kafka smt-2017 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2299.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2299 commit 1178670f36d8fbdf5cbdbb2728ace7bf4f0e7300 Author: Shikhar Bhushan <shikhar@confluent.io> Date: 2017-01-03T19:21:17Z KAFKA-3209 : KIP-66: single message transforms
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user shikhar opened a pull request:

          https://github.com/apache/kafka/pull/2374

          KAFKA-3209: KIP-66: more single message transforms

          WIP, in this PR I'd also like to add doc generation for transformations.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/shikhar/kafka more-smt

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/kafka/pull/2374.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2374


          commit f34cc71c9931ea7ec5dd045512c623196928a2a3
          Author: Shikhar Bhushan <shikhar@confluent.io>
          Date: 2017-01-13T20:00:31Z

          SetSchemaMetadata SMT

          commit 09af66b3f1861bf2a0718aaf79ca1a3bd22adcec
          Author: Shikhar Bhushan <shikhar@confluent.io>
          Date: 2017-01-13T21:44:57Z

          Support schemaless and rename HoistToStruct->Hoist; add the inverse Extract transform

          commit 022f4920c5f09d068bbf49e47091a1333dc48be2
          Author: Shikhar Bhushan <shikhar@confluent.io>
          Date: 2017-01-13T21:51:43Z

          InsertField transform – fix bad name for interface containing config name constants

          commit c5260a718e2f0ade66c4607a4b9c21abda61b90c
          Author: Shikhar Bhushan <shikhar@confluent.io>
          Date: 2017-01-13T22:01:25Z

          ValueToKey SMT


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user shikhar opened a pull request: https://github.com/apache/kafka/pull/2374 KAFKA-3209 : KIP-66: more single message transforms WIP, in this PR I'd also like to add doc generation for transformations. You can merge this pull request into a Git repository by running: $ git pull https://github.com/shikhar/kafka more-smt Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2374.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2374 commit f34cc71c9931ea7ec5dd045512c623196928a2a3 Author: Shikhar Bhushan <shikhar@confluent.io> Date: 2017-01-13T20:00:31Z SetSchemaMetadata SMT commit 09af66b3f1861bf2a0718aaf79ca1a3bd22adcec Author: Shikhar Bhushan <shikhar@confluent.io> Date: 2017-01-13T21:44:57Z Support schemaless and rename HoistToStruct->Hoist; add the inverse Extract transform commit 022f4920c5f09d068bbf49e47091a1333dc48be2 Author: Shikhar Bhushan <shikhar@confluent.io> Date: 2017-01-13T21:51:43Z InsertField transform – fix bad name for interface containing config name constants commit c5260a718e2f0ade66c4607a4b9c21abda61b90c Author: Shikhar Bhushan <shikhar@confluent.io> Date: 2017-01-13T22:01:25Z ValueToKey SMT
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/kafka/pull/2374

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/2374
          Show
          ewencp Ewen Cheslack-Postava added a comment - Fixed in https://github.com/apache/kafka/pull/2374
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user ewencp opened a pull request:

          https://github.com/apache/kafka/pull/2458

          KAFKA-3209: KIP-66: Flatten and Cast single message transforms

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/ewencp/kafka kafka-3209-even-more-transforms

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/kafka/pull/2458.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2458


          commit 14692133dc80552410b2b0eac76b9fca8a73afe1
          Author: Ewen Cheslack-Postava <me@ewencp.org>
          Date: 2017-01-27T07:26:02Z

          KAFKA-3209: KIP-66: Flatten and Cast single message transforms


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user ewencp opened a pull request: https://github.com/apache/kafka/pull/2458 KAFKA-3209 : KIP-66: Flatten and Cast single message transforms You can merge this pull request into a Git repository by running: $ git pull https://github.com/ewencp/kafka kafka-3209-even-more-transforms Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2458.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2458 commit 14692133dc80552410b2b0eac76b9fca8a73afe1 Author: Ewen Cheslack-Postava <me@ewencp.org> Date: 2017-01-27T07:26:02Z KAFKA-3209 : KIP-66: Flatten and Cast single message transforms
          Hide
          ewencp Ewen Cheslack-Postava added a comment -

          Moved the remaining transforms under a new JIRA marked with the next release as fix version: https://issues.apache.org/jira/browse/KAFKA-4714

          Show
          ewencp Ewen Cheslack-Postava added a comment - Moved the remaining transforms under a new JIRA marked with the next release as fix version: https://issues.apache.org/jira/browse/KAFKA-4714

            People

            • Assignee:
              ewencp Ewen Cheslack-Postava
              Reporter:
              nehanarkhede Neha Narkhede
              Reviewer:
              Ewen Cheslack-Postava
            • Votes:
              5 Vote for this issue
              Watchers:
              12 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development