Details

    Description

      KIP-36 was implemented some time ago in Kafka. This allows brokers and consumers to communicate about the rack (or AWS Availability Zone) they're located in. Reading from a local broker can save money in bandwidth and improve latency for your consumers.

      Flink Kafka consumers currently cannot easily use rack awareness if they're deployed across multiple racks or availability zones, because they have no control over which rack the Task Manager they'll be assigned to may be in. 

      This improvement proposes that a Kafka Consumer could be configured with a callback or Future that could be run when it's being configured on the task manager, that will set the appropriate value at runtime if a value is provided. 

      Attachments

        Activity

          martijnvisser Martijn Visser made changes -
          Issue Type Improvement [ 4 ] New Feature [ 2 ]
          tzulitai Tzu-Li (Gordon) Tai made changes -
          Resolution Fixed [ 1 ]
          Status In Progress [ 3 ] Closed [ 6 ]
          tzulitai Tzu-Li (Gordon) Tai made changes -
          Fix Version/s kafka-3.1.0 [ 12353135 ]

          Thanks for driving this to the finish line Jeremy DeGroot.

          Merged to apache/flink-connector-kafka:main with d89a082180232bb79e3c764228c4e7dbb9eb6b8b

          tzulitai Tzu-Li (Gordon) Tai added a comment - Thanks for driving this to the finish line Jeremy DeGroot . Merged to apache/flink-connector-kafka:main with d89a082180232bb79e3c764228c4e7dbb9eb6b8b
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #53 (Web Link)" [ 332144 ] This issue links to "GitHub Pull Request #53 (Web Link)" [ 332144 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #53 (Web Link)" [ 332144 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          flink-jira-bot Flink Jira Bot made changes -
          Labels pull-request-available pull-request-available stale-assigned
          flink-jira-bot Flink Jira Bot added a comment -

          I am the Flink Jira Bot and I help the community manage its development. I see this issue is assigned but has not received an update in 30 days, so it has been labeled "stale-assigned".
          If you are still working on the issue, please remove the label and add a comment updating the community on your progress. If this issue is waiting on feedback, please consider this a reminder to the committer/reviewer. Flink is a very active project, and so we appreciate your patience.
          If you are no longer working on the issue, please unassign yourself so someone else may work on it.

          flink-jira-bot Flink Jira Bot added a comment - I am the Flink Jira Bot and I help the community manage its development. I see this issue is assigned but has not received an update in 30 days, so it has been labeled "stale-assigned". If you are still working on the issue, please remove the label and add a comment updating the community on your progress. If this issue is waiting on feedback, please consider this a reminder to the committer/reviewer. Flink is a very active project, and so we appreciate your patience. If you are no longer working on the issue, please unassign yourself so someone else may work on it.
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          chiggi_dev Chirag Dewan added a comment -

          Yeah, that makes sense. Thanks Jeremy DeGroot for taking this up. 

          chiggi_dev Chirag Dewan added a comment - Yeah, that makes sense. Thanks Jeremy DeGroot for taking this up. 
          jeremy.degroot Jeremy DeGroot added a comment -

          Chirag Dewan There is a PR open for it now (https://github.com/apache/flink-connector-kafka/pull/20), so hopefully in the next release or two.

          Regarding how we use it, we have an MSK cluster stretched across three AZs and a Flink cluster across those same AZs. Since our consumers do a lot of feature extraction, filtering, mapping, and windowing of the data reduces the volume significantly from what is initially read in from kafka. What gets sent to downstream processors and sinks is orders of magnitude smaller and less expensive. If your workflow doesn't reduce intra-cluster traffic as dramatically, you'll probably want to look carefully at your partitioning and chaining choices in your jobs.

          jeremy.degroot Jeremy DeGroot added a comment - Chirag Dewan There is a PR open for it now ( https://github.com/apache/flink-connector-kafka/pull/20 ), so hopefully in the next release or two. Regarding how we use it, we have an MSK cluster stretched across three AZs and a Flink cluster across those same AZs. Since our consumers do a lot of feature extraction, filtering, mapping, and windowing of the data reduces the volume significantly from what is initially read in from kafka. What gets sent to downstream processors and sinks is orders of magnitude smaller and less expensive. If your workflow doesn't reduce intra-cluster traffic as dramatically, you'll probably want to look carefully at your partitioning and chaining choices in your jobs.
          chiggi_dev Chirag Dewan added a comment -

          This is a good thread and a useful feature. Thanks for this.

          When can I expect this to be delivered as a Flink release?

          Also, any more insights on how you used it in your multi-AZ cluster? Are you deploying a stretched Flink cluster or a Flink cluster in every AZ in some active-active or active-passive mode? 

          With a stretched cluster, I assume we will simply replace the broker-consumer cost with the TaskManager-JobManager networking cost. Is this assumption correct?

          chiggi_dev Chirag Dewan added a comment - This is a good thread and a useful feature. Thanks for this. When can I expect this to be delivered as a Flink release? Also, any more insights on how you used it in your multi-AZ cluster? Are you deploying a stretched Flink cluster or a Flink cluster in every AZ in some active-active or active-passive mode?  With a stretched cluster, I assume we will simply replace the broker-consumer cost with the TaskManager-JobManager networking cost. Is this assumption correct?
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          jeremy.degroot Jeremy DeGroot made changes -
          Status Open [ 1 ] In Progress [ 3 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #22309 (Web Link)" [ 315905 ] This issue links to "GitHub Pull Request #22309 (Web Link)" [ 315905 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #22309 (Web Link)" [ 315905 ] This issue links to "GitHub Pull Request #22309 (Web Link)" [ 315905 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ] This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #20 (Web Link)" [ 315932 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #22309 (Web Link)" [ 315905 ] This issue links to "GitHub Pull Request #22309 (Web Link)" [ 315905 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #22309 (Web Link)" [ 315905 ] This issue links to "GitHub Pull Request #22309 (Web Link)" [ 315905 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #22309 (Web Link)" [ 315905 ] This issue links to "GitHub Pull Request #22309 (Web Link)" [ 315905 ]
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #22309 (Web Link)" [ 315905 ] This issue links to "GitHub Pull Request #22309 (Web Link)" [ 315905 ]
          githubbot ASF GitHub Bot made changes -
          Labels pull-request-available
          githubbot ASF GitHub Bot made changes -
          Remote Link This issue links to "GitHub Pull Request #22309 (Web Link)" [ 315905 ]
          jeremy.degroot Jeremy DeGroot added a comment -

          Someone on the dev list pointed out I had the wrong KIP referenced here. I fixed it.

          jeremy.degroot Jeremy DeGroot added a comment - Someone on the dev list pointed out I had the wrong KIP referenced here. I fixed it.
          jeremy.degroot Jeremy DeGroot made changes -
          Description [KIP-708|https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams] was implemented some time ago in Kafka. This allows brokers and consumers to communicate about the rack (or AWS Availability Zone) they're located in. Reading from a local broker can save money in bandwidth and improve latency for your consumers.

          Flink Kafka consumers currently cannot easily use rack awareness if they're deployed across multiple racks or availability zones, because they have no control over which rack the Task Manager they'll be assigned to may be in. 

          This improvement proposes that a Kafka Consumer could be configured with a callback or Future that could be run when it's being configured on the task manager, that will set the appropriate value at runtime if a value is provided. 
          [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment] was implemented some time ago in Kafka. This allows brokers and consumers to communicate about the rack (or AWS Availability Zone) they're located in. Reading from a local broker can save money in bandwidth and improve latency for your consumers.

          Flink Kafka consumers currently cannot easily use rack awareness if they're deployed across multiple racks or availability zones, because they have no control over which rack the Task Manager they'll be assigned to may be in. 

          This improvement proposes that a Kafka Consumer could be configured with a callback or Future that could be run when it's being configured on the task manager, that will set the appropriate value at runtime if a value is provided. 
          renqs Qingsheng Ren added a comment -

          Jeremy DeGroot Thanks for the FLIP! Could you start a discussion thread for this FLIP in the dev mailing list?

          renqs Qingsheng Ren added a comment - Jeremy DeGroot Thanks for the FLIP! Could you start a discussion thread for this FLIP in the dev mailing list?
          jeremy.degroot Jeremy DeGroot added a comment -

          Mason Chen yeah, I'll get that going this week.

           

          Also, an update on timing for this. My team are planning to use some self-directed time we have during the holidays due to code freezes to work on this. So we won't be starting until around Thanksgiving, but we should be able to devote some time to it and make rapid progress when we do start.

          jeremy.degroot Jeremy DeGroot added a comment - Mason Chen yeah, I'll get that going this week.   Also, an update on timing for this. My team are planning to use some self-directed time we have during the holidays due to code freezes to work on this. So we won't be starting until around Thanksgiving, but we should be able to devote some time to it and make rapid progress when we do start.
          mason6345 Mason Chen added a comment -

          Hi Jeremy DeGroot I also implemented something similarly internally and can help with the review. Can you start a discussion thread on the dev mailing list on the FLIP so we can move this forward? You can look to other FLIPs for example discussions

          mason6345 Mason Chen added a comment - Hi Jeremy DeGroot  I also implemented something similarly internally and can help with the review. Can you start a discussion thread on the dev mailing list on the FLIP so we can move this forward? You can look to other FLIPs for example discussions
          jeremy.degroot Jeremy DeGroot added a comment - Here's the FLIP page I made for this https://cwiki.apache.org/confluence/display/FLINK/FLIP-268%3A+Kafka+Rack+Awareness
          jeremy.degroot Jeremy DeGroot added a comment -

          Martijn Visser If I'm reading KIP-392 correctly, that's been implemented to take advantage of metadata like rack awareness. By implementing this, we'd get the benefit of KIP-392 (at least as it relates to the rack ID).

          jeremy.degroot Jeremy DeGroot added a comment - Martijn Visser If I'm reading KIP-392 correctly, that's been implemented to take advantage of metadata like rack awareness. By implementing this, we'd get the benefit of KIP-392 (at least as it relates to the rack ID).
          martijnvisser Martijn Visser added a comment -

          Jeremy DeGroot You should have permission now

          One side question: is something similar necessary for Flink to support KIP-392 (Closest Replica)?

          martijnvisser Martijn Visser added a comment - Jeremy DeGroot You should have permission now One side question: is something similar necessary for Flink to support KIP-392 (Closest Replica)?
          jeremy.degroot Jeremy DeGroot added a comment -

          Martijn Visser I just signed into the Confluence and I'm unable to create a FLIP page. My username is the same over there.

          jeremy.degroot Jeremy DeGroot added a comment - Martijn Visser I just signed into the Confluence and I'm unable to create a FLIP page. My username is the same over there.
          martijnvisser Martijn Visser added a comment -

          Let me know if any of you needs permissions to create a FLIP page

          martijnvisser Martijn Visser added a comment - Let me know if any of you needs permissions to create a FLIP page
          mason6345 Mason Chen added a comment -

          +1 for this feature, it was a lot easier to implement in FlinkKafkaConsumer since you could extend the open method

          mason6345 Mason Chen added a comment - +1 for this feature, it was a lot easier to implement in FlinkKafkaConsumer since you could extend the open method
          renqs Qingsheng Ren made changes -
          Assignee Jeremy DeGroot [ JIRAUSER286379 ]
          renqs Qingsheng Ren added a comment -

          Jeremy DeGroot Thanks for the contribution! Feel free to start a discussion thread in the dev mailing list or create a FLIP page once you are ready. 

          renqs Qingsheng Ren added a comment - Jeremy DeGroot Thanks for the contribution! Feel free to start a discussion thread in the dev mailing list or create a FLIP page once you are ready. 
          jeremy.degroot Jeremy DeGroot made changes -
          Field Original Value New Value
          Description [KIP-708|https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams] was implemented some time ago in Kafka. This allows brokers and consumers to communicate about the rack (or AWS Availability Zone) they're located in. Reading from a local broker can save money in bandwidth and improve latency for your consumers.

          Flink Kafka consumers currently cannot easily rack awareness if they're deployed across multiple racks or availability zones, because they have no control over which rack the Task Manager they'll be assigned to may be in. 

          This improvement proposes that a Kafka Consumer could be configured with a callback or Future that could be run when it's being configured on the task manager, that will set the appropriate value at runtime if a value is provided. 
          [KIP-708|https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams] was implemented some time ago in Kafka. This allows brokers and consumers to communicate about the rack (or AWS Availability Zone) they're located in. Reading from a local broker can save money in bandwidth and improve latency for your consumers.

          Flink Kafka consumers currently cannot easily use rack awareness if they're deployed across multiple racks or availability zones, because they have no control over which rack the Task Manager they'll be assigned to may be in. 

          This improvement proposes that a Kafka Consumer could be configured with a callback or Future that could be run when it's being configured on the task manager, that will set the appropriate value at runtime if a value is provided. 
          jeremy.degroot Jeremy DeGroot added a comment -

          Qingsheng Ren Yes, I'd be happy to lead the design of this feature! 

          We currently run Flink 1.14 in production, and so we wanted to move from the FlinkKafkaConsumer to the KafkaSource API and that's what our first POC of this feature was based on. However we weren't happy with the performance and behavior of the KafkaSource in the version we were running at the time (the 1.14.2 release, if I remember correctly) so we implemented our final version on FlinkKafkaConsumer. I'd be happy to target the KafkaSource API for this.

          jeremy.degroot Jeremy DeGroot added a comment - Qingsheng Ren Yes, I'd be happy to lead the design of this feature!  We currently run Flink 1.14 in production, and so we wanted to move from the FlinkKafkaConsumer to the KafkaSource API and that's what our first POC of this feature was based on. However we weren't happy with the performance and behavior of the KafkaSource in the version we were running at the time (the 1.14.2 release, if I remember correctly) so we implemented our final version on FlinkKafkaConsumer. I'd be happy to target the KafkaSource API for this.
          renqs Qingsheng Ren added a comment -

          Thanks for starting the discussion Jeremy DeGroot ! This is a very interesting also useful feature as you described. 

          Under the design of FLIP-27 Source API we do expose host name of source readers when registering readers on the split enumerator, so it's possible to let split enumerator to make assignments according to the mapping of rack and hostname of readers. File source has already implemented this feature (see LocalityAwareSplitAssigner). Currently the split assigning strategy of Kafka source is a hard-coded one, so in order to achieve this in KafkaSource we need to design a new API to let users provide pluggable split assigner for split enumerator in Kafka source. 

          Moreover a fully optimized solution would be that Flink scheduler could also schedule tasks based on locality, but this is beyond the discussion of this ticket. 

          Jeremy DeGroot I'm not sure if your solution is based on the new KafkaSource instead of the deprecated FlinkKafkaConsumer (we won't add new features to the deprecated one). Would you like to lead the design of this feature? I think a new FLIP is expected for this as we are introducing a new feature to Kafka source. 

          renqs Qingsheng Ren added a comment - Thanks for starting the discussion Jeremy DeGroot ! This is a very interesting also useful feature as you described.  Under the design of FLIP-27 Source API we do expose host name of source readers when registering readers on the split enumerator, so it's possible to let split enumerator to make assignments according to the mapping of rack and hostname of readers. File source has already implemented this feature (see LocalityAwareSplitAssigner ). Currently the split assigning strategy of Kafka source is a hard-coded one, so in order to achieve this in KafkaSource we need to design a new API to let users provide pluggable split assigner for split enumerator in Kafka source.  Moreover a fully optimized solution would be that Flink scheduler could also schedule tasks based on locality, but this is beyond the discussion of this ticket.  Jeremy DeGroot I'm not sure if your solution is based on the new KafkaSource instead of the deprecated FlinkKafkaConsumer (we won't add new features to the deprecated one). Would you like to lead the design of this feature? I think a new FLIP is expected for this as we are introducing a new feature to Kafka source. 
          martijnvisser Martijn Visser added a comment -

          Qingsheng Ren What do you think?

          martijnvisser Martijn Visser added a comment - Qingsheng Ren What do you think?
          jeremy.degroot Jeremy DeGroot added a comment -

          I'll provide a little further justification and background for this in a comment. At my job we were tasked with reducing our AWS spend, and one place we found that could be improved was Inter-AZ bandwidth. We implemented something similar to what I describe above, and realized significant savings (bringing our billable bandwidth from 60% of the total down to 20%). It seems likely other people would also like to save money in this fashion. If this gets taken up, we'd also be willing to provide our implementation as a basis for development.

          jeremy.degroot Jeremy DeGroot added a comment - I'll provide a little further justification and background for this in a comment. At my job we were tasked with reducing our AWS spend, and one place we found that could be improved was Inter-AZ bandwidth. We implemented something similar to what I describe above, and realized significant savings (bringing our billable bandwidth from 60% of the total down to 20%). It seems likely other people would also like to save money in this fashion. If this gets taken up, we'd also be willing to provide our implementation as a basis for development.
          jeremy.degroot Jeremy DeGroot created issue -

          People

            jeremy.degroot Jeremy DeGroot
            jeremy.degroot Jeremy DeGroot
            Votes:
            0 Vote for this issue
            Watchers:
            Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack