Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-31655

Adaptive Channel selection for partitioner

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • Runtime / Task
    • None

    Description

      In Flink, if the upstream and downstream operator parallelism is not the same, then by default the RebalancePartitioner will be used to select the target channel.

      In our company, users often use flink to access redis, hbase or other rpc services, If some of the Operators are slow to return requests (for external service reasons), then because Rebalance/Rescale are Round-Robin the Channel selection policy, so the job is easy to backpressure.

      Because the Rebalance/Rescale policy does not care which subtask the data is sent to downstream, so we expect Rebalance/Rescale to refer to the processing power of the downstream subtask when choosing a Channel.

      Send more data to the free subtask, this ensures the best possible throughput of job!

       

       

       

      Attachments

        Activity

          tartarus tartarus added a comment -

          gaoyunhaii  fanrui  Please take a look at this issue, after our company use the Adaptive Rebalance/Rescale, job throughput improved by about 20% on average compared to the community version.

          We can discuss how we can contribute this feature to the community!

           

          please assign to me, thanks~

          tartarus tartarus added a comment - gaoyunhaii   fanrui  Please take a look at this issue, after our company use the Adaptive Rebalance/Rescale, job throughput improved by about 20% on average compared to the community version. We can discuss how we can contribute this feature to the community!   please assign to me, thanks~
          tartarus tartarus added a comment -

          cc zhuzh 

          tartarus tartarus added a comment - cc zhuzh  
          fanrui Rui Fan added a comment -

          Hi tartarus , thanks for the good proposal, it's really a useful feature for flink users. And I see it has been assigned to you.

          Would you mind using google doc to sort out the design doc first? It includes:

          • Motivation
          • User Interface: How flink users use this feature?
          • Judgment of pressure: How to judge the downstream channel pressure?
          • Rule for picking channel: How to pick a channel?
          fanrui Rui Fan added a comment - Hi tartarus , thanks for the good proposal, it's really a useful feature for flink users. And I see it has been assigned to you. Would you mind using google doc to sort out the design doc first? It includes: Motivation User Interface: How flink users use this feature? Judgment of pressure: How to judge the downstream channel pressure? Rule for picking channel: How to pick a channel?
          gaoyunhaii Yun Gao added a comment -

          Hi tartarus thanks for the proposal, it also looks useful from my side. 

          There are also some discussion about it in the community previously , and some points are mentioned, like we might not introduce new locks and we might design carefully about the structure to maintain the active channels to avoid additional overhead, thus I also think if convenient you may propose a design doc thus we could first get consensus on the overall design. 

          Also cc pltbkd , who has also implemented the functionality previously in the internal version. 

          gaoyunhaii Yun Gao added a comment - Hi tartarus thanks for the proposal, it also looks useful from my side.  There are also some discussion about it in the community previously , and some points are mentioned, like we might not introduce new locks and we might design carefully about the structure to maintain the active channels to avoid additional overhead, thus I also think if convenient you may propose a design doc thus we could first get consensus on the overall design.  Also cc pltbkd , who has also implemented the functionality previously in the internal version. 
          tartarus tartarus added a comment -

          Hi gaoyunhaii fanrui 

          Thanks for your suggestions, I will produce a design doc as soon.

          tartarus tartarus added a comment - Hi gaoyunhaii fanrui   Thanks for your suggestions, I will produce a design doc as soon.

          Hi tartarus thanks for this discussion.

          I think I remember the same discussion which gaoyunhaii mentioned above. I just want to add that I tried to implement it as well. You can take a look at it here - https://github.com/apache/flink/pull/16224. My solution works without any extra synchronization and locks but the performance for other rebalances(non-Adaptive) was impacted anyway. I changed only `BufferWritingResultPartition#emitRecord` and added a non-volatile variable to `PipelinedSubpartition` which was enough for small degradation in the benchmark since it is a hot path.
          I don't remember exactly why we stopped discussing it but we should be careful about it since Adaptive Rebalance is not so common usage and it will be unfortunate if we slow down more common rebalances in favor to have this less common one.

          akalash Anton Kalashnikov added a comment - Hi tartarus thanks for this discussion. I think I remember the same discussion which gaoyunhaii mentioned above. I just want to add that I tried to implement it as well. You can take a look at it here - https://github.com/apache/flink/pull/16224 . My solution works without any extra synchronization and locks but the performance for other rebalances(non-Adaptive) was impacted anyway. I changed only `BufferWritingResultPartition#emitRecord` and added a non-volatile variable to `PipelinedSubpartition` which was enough for small degradation in the benchmark since it is a hot path. I don't remember exactly why we stopped discussing it but we should be careful about it since Adaptive Rebalance is not so common usage and it will be unfortunate if we slow down more common rebalances in favor to have this less common one.
          tartarus tartarus added a comment -

          Hi akalash  thanks for your reminder!

          In our company, adaptive Partitioner is an optional optimization feature that is not enabled by default, and is usually enabled for jobs that have external heavy IO access. The performance overhead of select Channel is acceptable compared to the backpressure and lag caused by high load nodes, and can bring a positive gain of about 20%.

          I wrote a doc the other day, Adaptive Channel selection for partitioner. You are welcome to give some professional advice.

          Solution 1:Simple to implement, but each selectChannel needs to be traversed, there will be a certain performance overhead; [Already verified online, I'm trying to test out the performance overhead at different parallelism in a similar way to benchmark.]
          Solution 2:almost no performance overhead for select channel, but requiring additional operations in the hot path and the need to operate within locks, this solution would be more prudent and require more discussion and validation.

          I looked at your implementation of LoadBasedRecordWriter, If I haven't missed any details, SubpartitionStatistic is only updated at the time of emit, this statistic does not represent the true computing power of downstream operators. Maybe we want to achieve different goals, what I want to solve is to avoid the impact of high load nodes on flink job throughput.

          Looking forward to more feedback.

          tartarus tartarus added a comment - Hi akalash   thanks for your reminder! In our company, adaptive Partitioner is an optional optimization feature that is not enabled by default, and is usually enabled for jobs that have external heavy IO access. The performance overhead of select Channel is acceptable compared to the backpressure and lag caused by high load nodes, and can bring a positive gain of about 20%. I wrote a doc the other day, Adaptive Channel selection for partitioner . You are welcome to give some professional advice. Solution 1 :Simple to implement, but each selectChannel needs to be traversed, there will be a certain performance overhead; [Already verified online, I'm trying to test out the performance overhead at different parallelism in a similar way to benchmark.] Solution 2 :almost no performance overhead for select channel, but requiring additional operations in the hot path and the need to operate within locks, this solution would be more prudent and require more discussion and validation. I looked at your implementation of LoadBasedRecordWriter, If I haven't missed any details, SubpartitionStatistic  is only updated at the time of emit, this statistic does not represent the true computing power of downstream operators. Maybe we want to achieve different goals, what I want to solve is to avoid the impact of high load nodes on flink job throughput. Looking forward to more feedback.
          pltbkd Gen Luo added a comment -

          Hi akalash, thanks for the remind.
          I agree that the impaction to the normal rebalance must be measured. Though I wonder why the implementation in the pull request could impact significantly. Hopefully we can implement this time in a more controllable way and make sure the impaction is acceptable.

          On the other hand, in my opinion, if the adaptive rebalance does little regression to the performance, maybe we can make it 'suggested' or even 'default', unless the users need all subtasks process exactly the same amount of records.

          Hi tartarus,
          Thanks for the doc! As we discussed offline, we'd better make the issue a FLIP and raise a formal discussion in the mailing list. As others mentioned, the feature was discussed before and faced quite some problems. I think we need a formal proposal with some implementation plans, then provide some benchmark results with a POC version, which include the gain of performance in applicable scenes, and the impaction to normal rebalance.

          pltbkd Gen Luo added a comment - Hi akalash , thanks for the remind. I agree that the impaction to the normal rebalance must be measured. Though I wonder why the implementation in the pull request could impact significantly. Hopefully we can implement this time in a more controllable way and make sure the impaction is acceptable. On the other hand, in my opinion, if the adaptive rebalance does little regression to the performance, maybe we can make it 'suggested' or even 'default', unless the users need all subtasks process exactly the same amount of records. Hi tartarus , Thanks for the doc! As we discussed offline, we'd better make the issue a FLIP and raise a formal discussion in the mailing list. As others mentioned, the feature was discussed before and faced quite some problems. I think we need a formal proposal with some implementation plans, then provide some benchmark results with a POC version, which include the gain of performance in applicable scenes, and the impaction to normal rebalance.
          pnowojski Piotr Nowojski added a comment -

          I think there are ways to address problems with akalash proposal, without adding too much overhead (also checking getWritingThreadTotalNumberOfSentBytes for the channel that we haven't selected?). I think both of your proposed solutions would be quite costly tartarus, adding extra synchronisations that we don't necessarily need with some clever heuristic.

          I agree with pltbkd, we would probably need a FLIP, as this will be changing public APIs (either changing behaviour of an existing rebalance(), or adding a new one like loadRebalance()).

          pnowojski Piotr Nowojski added a comment - I think there are ways to address problems with akalash proposal, without adding too much overhead (also checking getWritingThreadTotalNumberOfSentBytes for the channel that we haven't selected?). I think both of your proposed solutions would be quite costly tartarus , adding extra synchronisations that we don't necessarily need with some clever heuristic. I agree with pltbkd , we would probably need a FLIP, as this will be changing public APIs (either changing behaviour of an existing rebalance() , or adding a new one like loadRebalance() ).

          I wonder why the implementation in the pull request could impact significantly

          It wasn't significant impact. As I remember there were two sensitive microbenchamrks which showed a drop of about 2-3%. Maybe we even could accept it but we haven't discussed it properly

          I looked at your implementation of LoadBasedRecordWriter If I havent missed any details SubpartitionStatistic is only updated at the time of emit this statistic does not represent the true computing power of downstream operators

          Yes, you are absolutely right that it is updated only at the time of emit which is not so perfect. But it was my small POC I just tried to show that it is possible to do without extra locks.

          The logic in my current PR:

          • During the emit we take `currentChannel` and `nextChannel`(currentChannel + 1)
          • If `bytesInQueue(currentChannel)` > `bytesInQueue(nextChannel)` then we set `nextChannel` as `currentChannel` otherwise leave `currentChannel` as is

          As already was noticed, the current solution can be stuck on one channel forever since it never updates the bytesInQueue for other channels but current.

          The improved logic can look like this:

          • We have `roundRobinChannel`(the channel which increased by 1 on every emit)
          • During the emit we increase `roundRobinChannel` by 1
          • We update `bytesInQueue` for `roundRobinChannel`
          • Now, If `bytesInQueue(currentChannel)` > `bytesInQueue(roundRobinChannel)` then we set `roundRobinChannel` as `currentChannel` otherwise leave `currentChannel` as is

          So if we have N channels then each channel will be updated with each N-th record which is accurate enough for our goal I think. But it is not clear yet how to update `roundRobinChannel` cheaply.

          Solution 1
          I still wonder about the implementation of getting buffer queue sizes since it is the tricky part. I solved this problem somehow in my PR but anyway, it still questions there.
          As well as pnowojski, I also have concerns about the efficiency of this solution since it is O(ChannelsCount) that doesn't look optimal. But we can just optimize that algorithm to something like I describe before or similar.

          Solution 2
          I doubt that we will be ready for extra synchronization on the hot path especially in common code for all partitioners. But of course, we can discuss it anyway but only with more specific implementation ideas.

          In conclusion, let's proceed with FLIP. And we can think:

          • How to effectively get channel queue size
          • How to effectively calculate the next channel
          akalash Anton Kalashnikov added a comment - I wonder why the implementation in the pull request could impact significantly It wasn't significant impact. As I remember there were two sensitive microbenchamrks which showed a drop of about 2-3%. Maybe we even could accept it but we haven't discussed it properly I looked at your implementation of LoadBasedRecordWriter If I havent missed any details SubpartitionStatistic is only updated at the time of emit this statistic does not represent the true computing power of downstream operators Yes, you are absolutely right that it is updated only at the time of emit which is not so perfect. But it was my small POC I just tried to show that it is possible to do without extra locks. The logic in my current PR: During the emit we take `currentChannel` and `nextChannel`(currentChannel + 1) If `bytesInQueue(currentChannel)` > `bytesInQueue(nextChannel)` then we set `nextChannel` as `currentChannel` otherwise leave `currentChannel` as is As already was noticed, the current solution can be stuck on one channel forever since it never updates the bytesInQueue for other channels but current. The improved logic can look like this: We have `roundRobinChannel`(the channel which increased by 1 on every emit) During the emit we increase `roundRobinChannel` by 1 We update `bytesInQueue` for `roundRobinChannel` Now, If `bytesInQueue(currentChannel)` > `bytesInQueue(roundRobinChannel)` then we set `roundRobinChannel` as `currentChannel` otherwise leave `currentChannel` as is So if we have N channels then each channel will be updated with each N-th record which is accurate enough for our goal I think. But it is not clear yet how to update `roundRobinChannel` cheaply. Solution 1 I still wonder about the implementation of getting buffer queue sizes since it is the tricky part. I solved this problem somehow in my PR but anyway, it still questions there. As well as pnowojski , I also have concerns about the efficiency of this solution since it is O(ChannelsCount) that doesn't look optimal. But we can just optimize that algorithm to something like I describe before or similar. Solution 2 I doubt that we will be ready for extra synchronization on the hot path especially in common code for all partitioners. But of course, we can discuss it anyway but only with more specific implementation ideas. In conclusion, let's proceed with FLIP. And we can think: How to effectively get channel queue size How to effectively calculate the next channel
          tartarus tartarus added a comment -

          akalash pnowojski pltbkd Thank you very much for your suggestions and information, it was very valuable to me!

          I will prepare a FLIP as soon, I tend to propose a new API to for Adaptive Partitioner, so as not to bring any impact on the user's existing jobs, adaptive Partitioner as an optional optimization attempt, 

          even though there is some performance overhead, users may be able to accept.

          tartarus tartarus added a comment - akalash pnowojski pltbkd Thank you very much for your suggestions and information, it was very valuable to me! I will prepare a FLIP as soon, I tend to propose a new API to for Adaptive Partitioner, so as not to bring any impact on the user's existing jobs, adaptive Partitioner as an optional optimization attempt,  even though there is some performance overhead, users may be able to accept.
          fanrui Rui Fan added a comment -

          Hi tartarus , some of our flink scenarios also need to use adaptive rebalance, for example: sink data to hdfs, and some subtasks meet slow datanode, flink can forward data to idle subtasks. 

          We have finished the internal version, and it works well. I read all discussions in detail and I want to share our solution here. It's similar your solution1, and solved the performance issue.

          We defined an option: dynamic-rebalance.max-traverse-size. It means we don't traverse all sub-partitions, we just traverse the next traverse size based on current channel. For general logic, we find the most idle channel from channel[currentChannel+1] to channel[currentChannel+maxTraverseSize].

          For example, dynamic-rebalance.max-traverse-size=10 , sub-partitions size = 100 and currentChannel=5. When sending the a record, we find the most idle channel from channel-6 to channel-15 instead of all channels, and mark the most idle channel as the currentChannel.

          Why do we choose this solution?

          1. On our production, just a small number of subtasks is very slow, and a large number of subtasks is healthy. And the max-traverse-size=10 is enough to skip these slow subtasks.
          2. It solved the performance issue of solution1.
          3. It doesn't introduce extra lock or collection to maintain the idle channel set than solution2.
          4. We improved the max-traverse-size strategy. We will choose the channel directly if we found the pending size of any channel is 0 from channel[currentChannel+1] to channel[currentChannel+maxTraverseSize]. Because the channel is most idle channel when pending size is 0.

          Here is core code:

          private void chooseLessLoadedSubPartition() {
              long bytesInQueue = Long.MAX_VALUE;
              int candidateChannel = 0;
              for (int i = 1; i <= maxTraverseSize; i++) {
                  int channel = (currentChannel + i) % numberOfSubpartitions;
                  long bytesInQueueCurrent = targetPartition.getBytesInQueueUnsafe(channel);
          
                  if (bytesInQueueCurrent == 0) {
                      // If there isn't any pending data in the current channel, choose this channel
                      // directly.
                      currentChannel = channel;
                      return;
                  }
                  if (bytesInQueueCurrent < bytesInQueue) {
                      candidateChannel = channel;
                      bytesInQueue = bytesInQueueCurrent;
                  }
              }
              currentChannel = candidateChannel;
          } 

           

          Looking forward to your feedback and suggestion, thanks.

          fanrui Rui Fan added a comment - Hi tartarus , some of our flink scenarios also need to use adaptive rebalance, for example: sink data to hdfs, and some subtasks meet slow datanode, flink can forward data to idle subtasks.  We have finished the internal version, and it works well. I read all discussions in detail and I want to share our solution here. It's similar your solution1, and solved the performance issue. We defined an option: dynamic-rebalance.max-traverse-size. It means we don't traverse all sub-partitions, we just traverse the next traverse size based on current channel. For general logic, we find the most idle channel from channel [currentChannel+1] to channel [currentChannel+maxTraverseSize] . For example, dynamic-rebalance.max-traverse-size=10 , sub-partitions size = 100 and currentChannel=5 . When sending the a record, we find the most idle channel from channel-6 to channel-15 instead of all channels, and mark the most idle channel as the currentChannel. Why do we choose this solution? On our production, just a small number of subtasks is very slow, and a large number of subtasks is healthy. And the max-traverse-size=10 is enough to skip these slow subtasks. It solved the performance issue of solution1. It doesn't introduce extra lock or collection to maintain the idle channel set than solution2. We improved the max-traverse-size strategy. We will choose the channel directly if we found the pending size of any channel is 0 from channel [currentChannel+1] to channel [currentChannel+maxTraverseSize] . Because the channel is most idle channel when pending size is 0. Here is core code: private void chooseLessLoadedSubPartition() { long bytesInQueue = Long .MAX_VALUE; int candidateChannel = 0; for ( int i = 1; i <= maxTraverseSize; i++) { int channel = (currentChannel + i) % numberOfSubpartitions; long bytesInQueueCurrent = targetPartition.getBytesInQueueUnsafe(channel); if (bytesInQueueCurrent == 0) { // If there isn't any pending data in the current channel, choose this channel // directly. currentChannel = channel; return ; } if (bytesInQueueCurrent < bytesInQueue) { candidateChannel = channel; bytesInQueue = bytesInQueueCurrent; } } currentChannel = candidateChannel; }   Looking forward to your feedback and suggestion, thanks.
          tartarus tartarus added a comment - - edited

          fanrui  Thank you very much for sharing your solution! 

          Internally, I also optimized solution 1, not to find the global optimal channel, but to find the optimal one in most of the channels.

           

          I did a performance test for the channel select policy and collected information on the time taken for 10,000 calls, like follows:

          Time consumption in nanoseconds:

           

          subpartition number 10(avg) 10(P95) 10(P99) 100(avg) 100(P95) 100(P99) 200(avg) 200(P95) 200(P99) 300(avg) 300(P95) 300(P99) 500(avg) 500(P95) 500(P99) 800(avg) 800(P95) 800(P99) 1000(avg) 1000(P95) 1000(P99) 2000(avg) 2000(P95) 2000(P99) 3000(avg) 3000(P95) 3000(P99) 5000(avg) 5000(P95) 5000(P99) 8000(avg) 8000(P95) 8000(P99) 10000(avg) 10000(P95) 10000(P99)
          Rebalance 68 90 102 66 94 116 69 86 96 68 104 199 68 89 99 74 118 131 67 88 99 67 90 104 58 93 110 70 89 95 61 81 92 63 86 99
          solution 1(Global Optimal) 203 372 1411 836 1285 3488 1152 2419 6547 1486 3672 9346 1926 5105 12107 2899 9285 21338 4569 13939 25574 8023 7929 24980 11701 12371 34079 19364 19910 53594 32792 42199 104582 37397 39548 73330
          optimized solution 1(Most of the best) 214 360 1652 1223 1447 4509 1508 2613 5816 1808 3815 8756 2124 5962 12336 3048 9012 21896 3277 10599 28300 7403 18832 37264 10543 22218 43845 15944 20266 62732 28271 41126 54055 36096 52062 82243
          Traverse 5 channels 213 357 1268 190 461 984 169 290 941 239 393 1221 220 566 1240 213 510 1100 209 478 1041 230 638 1336 190 472 964 183 388 945 178 516 978 224 511 1148
          Traverse 10 channels 226 440 1776 305 536 1601 227 477 1785 297 520 1564 243 507 1672 220 472 1422 276 487 1443 280 536 2042 254 535 1895 254 495 1976 249 472 1691 293 548 1631
          Traverse 20 channels 244 495 1706 298 421 973 296 413 961 394 725 844 279 387 854 406 695 845 360 499 1144 335 478 1112 302 495 960 392 703 844 400 688 866 343 681 1030

          The above table shows that the time complexity of Rebalance is O(1), while our optimization is O( n ), which is consistent with expectations. We are constantly trying to reduce the N.

          `maxTraverseSize` solution, which can be selected by the user according to the actual situation to make the best choice!

          I suggest we use a combination of option and api to set `maxTraverseSize`,  

          If maxTraverseSize is set through api, we can override the value of option; if not set through api, then use the value of option, because api can do the granularity of each adaptiveRebalance, while option is flink job granularity; but in SQL job the user can only set through option;

          The public api can be defined as follows:

           

          public DataStream<T> adaptiveRebalance(int maxTraverseSize) 

           

           

          pnowojski akalash pltbkd   what do you think?

          Looking forward to your feedback and suggestion, thanks.

           

          tartarus tartarus added a comment - - edited fanrui  Thank you very much for sharing your solution!  Internally, I also optimized solution 1, not to find the global optimal channel, but to find the optimal one in most of the channels.   I did a performance test for the channel select policy and collected information on the time taken for 10,000 calls, like follows: Time consumption in nanoseconds:   subpartition number 10(avg) 10(P95) 10(P99) 100(avg) 100(P95) 100(P99) 200(avg) 200(P95) 200(P99) 300(avg) 300(P95) 300(P99) 500(avg) 500(P95) 500(P99) 800(avg) 800(P95) 800(P99) 1000(avg) 1000(P95) 1000(P99) 2000(avg) 2000(P95) 2000(P99) 3000(avg) 3000(P95) 3000(P99) 5000(avg) 5000(P95) 5000(P99) 8000(avg) 8000(P95) 8000(P99) 10000(avg) 10000(P95) 10000(P99) Rebalance 68 90 102 66 94 116 69 86 96 68 104 199 68 89 99 74 118 131 67 88 99 67 90 104 58 93 110 70 89 95 61 81 92 63 86 99 solution 1(Global Optimal) 203 372 1411 836 1285 3488 1152 2419 6547 1486 3672 9346 1926 5105 12107 2899 9285 21338 4569 13939 25574 8023 7929 24980 11701 12371 34079 19364 19910 53594 32792 42199 104582 37397 39548 73330 optimized solution 1(Most of the best) 214 360 1652 1223 1447 4509 1508 2613 5816 1808 3815 8756 2124 5962 12336 3048 9012 21896 3277 10599 28300 7403 18832 37264 10543 22218 43845 15944 20266 62732 28271 41126 54055 36096 52062 82243 Traverse 5 channels 213 357 1268 190 461 984 169 290 941 239 393 1221 220 566 1240 213 510 1100 209 478 1041 230 638 1336 190 472 964 183 388 945 178 516 978 224 511 1148 Traverse 10 channels 226 440 1776 305 536 1601 227 477 1785 297 520 1564 243 507 1672 220 472 1422 276 487 1443 280 536 2042 254 535 1895 254 495 1976 249 472 1691 293 548 1631 Traverse 20 channels 244 495 1706 298 421 973 296 413 961 394 725 844 279 387 854 406 695 845 360 499 1144 335 478 1112 302 495 960 392 703 844 400 688 866 343 681 1030 The above table shows that the time complexity of Rebalance is O(1), while our optimization is O( n ), which is consistent with expectations. We are constantly trying to reduce the N. `maxTraverseSize` solution, which can be selected by the user according to the actual situation to make the best choice! I suggest we use a combination of option and api to set `maxTraverseSize`,   If maxTraverseSize is set through api, we can override the value of option; if not set through api, then use the value of option, because api can do the granularity of each adaptiveRebalance, while option is flink job granularity; but in SQL job the user can only set through option; The public api can be defined as follows:   public DataStream<T> adaptiveRebalance( int maxTraverseSize)     pnowojski akalash pltbkd    what do you think? Looking forward to your feedback and suggestion, thanks.  
          fanrui Rui Fan added a comment -

          Hi tartarus , thanks for the hard testing.

          From the result, I think the maxTraverseSize is a good choice! The Time consumption won't increase when the subpartition number is high.

          I still have some questions:

          • What's the difference between solution 1(Global Optimal) and optimized solution 1(Most of the best)?
          • When maxTraverseSize > the subpartition number, the maxTraverseSize should be the subpartition number. For example,  maxTraverseSize = 20, and the subpartition number is 10.
          • Do we need to set maxTraverseSize through api? The option is enough for most of cases.
          • How the SQL job to use the adaptiveRebalance? The rebalance should be the default.

          Anyway, it's the time to start a FLIP, and anyone can discuss the adaptiveRebalance in the mail list~

          fanrui Rui Fan added a comment - Hi tartarus , thanks for the hard testing. From the result, I think the maxTraverseSize is a good choice! The Time consumption won't increase when the subpartition number is high. I still have some questions: What's the difference between solution 1(Global Optimal) and optimized solution 1(Most of the best)? When maxTraverseSize > the subpartition number, the maxTraverseSize should be the subpartition number. For example,  maxTraverseSize = 20, and the subpartition number is 10. Do we need to set maxTraverseSize through api? The option is enough for most of cases. How the SQL job to use the adaptiveRebalance? The rebalance should be the default. Anyway, it's the time to start a FLIP, and anyone can discuss the adaptiveRebalance in the mail list~
          wangm92 Matt Wang added a comment - - edited

          tartarus thanks for the hard testing too, and can you share how much this time-consuming increase has affected the performance of the entire job?

          we also use a similar functions internally, but the implementation is slightly different. In order to avoid the traversal of each operation in most cases, we limit the maximum buffersInBacklog for each Channel(In extreme cases it can also lead to a global traversal):
          1. As long as the buffersInBacklog of this Channel does not exceed the threshold, use this Channel directly, otherwise, search for the next channel;
          2. When processing the next data, the same logical judgment will be made for lastChannelId + 1.

          Looking forward to the FLIP discussion

          wangm92 Matt Wang added a comment - - edited tartarus thanks for the hard testing too, and can you share how much this time-consuming increase has affected the performance of the entire job? we also use a similar functions internally, but the implementation is slightly different. In order to avoid the traversal of each operation in most cases, we limit the maximum buffersInBacklog for each Channel(In extreme cases it can also lead to a global traversal): 1. As long as the buffersInBacklog of this Channel does not exceed the threshold, use this Channel directly, otherwise, search for the next channel; 2. When processing the next data, the same logical judgment will be made for lastChannelId + 1. Looking forward to the FLIP discussion
          pnowojski Piotr Nowojski added a comment - - edited

          I also like the idea with maxTraverseSize option. I have a feeling that in the real world, values as small as 5 could be good enough. If not, and we really need values like ~20 or larger, we could also think about something like:

          • For N'th adaptiveRebalance call, where N % 10 == 1
            • iterate 20 channels
            • discard 10 worst channels, use 1st best channel
          • For N'th adaptiveRebalance call, where N % 10 != 1
            • use remaining 9 best channels from last iteration

          But I doubt it's really needed. Even for maxTraverseSize==5, we can always completely skip up to 4 slow channels if they are completely bottlenecked and bundled together. If we have more slow channels bundled together, those channels would be only getting 20% load compared to the fast channels. So this solution would be still working perfectly fine, if "slow" machine/machines are up to 5x slower compared to normal machines.

          I'm also looking forward to the FLIP. If I haven't made a mistake in the previous paragraph I think it would be good to add it to the FLIP as a justification why we don't need large maxTraverseSize.

          Btw, before publishing a FLIP, could you run mapRebalanceMapSink.F27_UNBOUNDED benchmark from flink-benchmarks (you can check readme and especially the general remarks) and compare normal rebalance with your adaptive one? This benchmark is defined in src/main/java/org/apache/flink/benchmark/InputBenchmark.java file.

          pnowojski Piotr Nowojski added a comment - - edited I also like the idea with maxTraverseSize option. I have a feeling that in the real world, values as small as 5 could be good enough. If not, and we really need values like ~20 or larger, we could also think about something like: For N 'th adaptiveRebalance call, where N % 10 == 1 iterate 20 channels discard 10 worst channels, use 1st best channel For N 'th adaptiveRebalance call, where N % 10 != 1 use remaining 9 best channels from last iteration But I doubt it's really needed. Even for maxTraverseSize==5 , we can always completely skip up to 4 slow channels if they are completely bottlenecked and bundled together. If we have more slow channels bundled together, those channels would be only getting 20% load compared to the fast channels. So this solution would be still working perfectly fine, if "slow" machine/machines are up to 5x slower compared to normal machines. I'm also looking forward to the FLIP. If I haven't made a mistake in the previous paragraph I think it would be good to add it to the FLIP as a justification why we don't need large maxTraverseSize . Btw, before publishing a FLIP, could you run mapRebalanceMapSink.F27_UNBOUNDED benchmark from flink-benchmarks (you can check readme and especially the general remarks ) and compare normal rebalance with your adaptive one? This benchmark is defined in src/main/java/org/apache/flink/benchmark/InputBenchmark.java file.
          tartarus tartarus added a comment -

          hi fanrui  thank you for your reply, I will answer your concerns one by one

          >What's the difference between solution 1(Global Optimal) and optimized solution 1(Most of the best)?

          the solution 1 (Global Optimal) is Each record of data is traversed through all subpartitions to find the best channel. So the more subpartitions, the more time it takes!

          optimized solution 1(Most of the best) is When a record is comes in, suppose it should be written to subpartition x, and if index x is in the first half of the subpartition array, then we find the optimal subpartition index from x to the end of subpartition array; if index x is in the second half of the subpartition array, then we find the optimal subpartition index from 0 to x; 

          >When maxTraverseSize > the subpartition number, the maxTraverseSize should be the subpartition number. For example,  maxTraverseSize = 20, and the subpartition number is 10.

          Yes, I have taken this case into account when testing

          >How the SQL job to use the adaptiveRebalance? The rebalance should be the default.

          If we introduce the new api adaptiveRebalance, how will SQL jobs use it and will SQL jobs not support adaptiveRebalance for now?

           

          I will refer to everyone's comments to complete the FLIP

          tartarus tartarus added a comment - hi fanrui   thank you for your reply, I will answer your concerns one by one >What's the difference between solution 1(Global Optimal) and optimized solution 1(Most of the best)? the solution 1 (Global Optimal) is Each record of data is traversed through all subpartitions to find the best channel. So the more subpartitions, the more time it takes! optimized solution 1(Most of the best) is When a record is comes in, suppose it should be written to subpartition x, and if index x is in the first half of the subpartition array, then we find the optimal subpartition index from x to the end of subpartition array; if index x is in the second half of the subpartition array, then we find the optimal subpartition index from 0 to x;   >When maxTraverseSize > the subpartition number, the maxTraverseSize should be the subpartition number. For example,  maxTraverseSize = 20, and the subpartition number is 10. Yes, I have taken this case into account when testing >How the SQL job to use the adaptiveRebalance? The rebalance should be the default. If we introduce the new api adaptiveRebalance, how will SQL jobs use it and will SQL jobs not support adaptiveRebalance for now?   I will refer to everyone's comments to complete the FLIP
          tartarus tartarus added a comment -

          Hi wangm92  Thank you for sharing, your solution sounds similar to fanrui 

          I tested solution 1 (global optimum) with 100 parallels, Throughput is roughly 40% lower (without any business logic, completely testing select channel performance).

          So I think maxTraverseSize is a good solution.

          tartarus tartarus added a comment - Hi wangm92  Thank you for sharing, your solution sounds similar to fanrui   I tested solution 1 (global optimum) with 100 parallels, Throughput is roughly 40% lower (without any business logic, completely testing select channel performance). So I think maxTraverseSize is a good solution.
          tartarus tartarus added a comment -

          Hi pnowojski  thank you for your reply.

          I currently tend to use maxTraverseSize==5 to implement, more complex algorithmic strategies, we can discuss later, I will refer to your suggestion and use flink benchmark to test it.

          FLIP will come soon

          tartarus tartarus added a comment - Hi pnowojski  thank you for your reply. I currently tend to use maxTraverseSize==5 to implement, more complex algorithmic strategies, we can discuss later, I will refer to your suggestion and use flink benchmark to test it. FLIP will come soon

          People

            tartarus tartarus
            tartarus tartarus
            Votes:
            0 Vote for this issue
            Watchers:
            17 Start watching this issue

            Dates

              Created:
              Updated: