Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
In Flink, if the upstream and downstream operator parallelism is not the same, then by default the RebalancePartitioner will be used to select the target channel.
In our company, users often use flink to access redis, hbase or other rpc services, If some of the Operators are slow to return requests (for external service reasons), then because Rebalance/Rescale are Round-Robin the Channel selection policy, so the job is easy to backpressure.
Because the Rebalance/Rescale policy does not care which subtask the data is sent to downstream, so we expect Rebalance/Rescale to refer to the processing power of the downstream subtask when choosing a Channel.
Send more data to the free subtask, this ensures the best possible throughput of job!
Attachments
Issue Links
- mentioned in
Activity
Hi tartarus , thanks for the good proposal, it's really a useful feature for flink users. And I see it has been assigned to you.
Would you mind using google doc to sort out the design doc first? It includes:
- Motivation
- User Interface: How flink users use this feature?
- Judgment of pressure: How to judge the downstream channel pressure?
- Rule for picking channel: How to pick a channel?
Hi tartarus thanks for the proposal, it also looks useful from my side.
There are also some discussion about it in the community previously , and some points are mentioned, like we might not introduce new locks and we might design carefully about the structure to maintain the active channels to avoid additional overhead, thus I also think if convenient you may propose a design doc thus we could first get consensus on the overall design.
Also cc pltbkd , who has also implemented the functionality previously in the internal version.
Hi gaoyunhaii fanrui
Thanks for your suggestions, I will produce a design doc as soon.
Hi tartarus thanks for this discussion.
I think I remember the same discussion which gaoyunhaii mentioned above. I just want to add that I tried to implement it as well. You can take a look at it here - https://github.com/apache/flink/pull/16224. My solution works without any extra synchronization and locks but the performance for other rebalances(non-Adaptive) was impacted anyway. I changed only `BufferWritingResultPartition#emitRecord` and added a non-volatile variable to `PipelinedSubpartition` which was enough for small degradation in the benchmark since it is a hot path.
I don't remember exactly why we stopped discussing it but we should be careful about it since Adaptive Rebalance is not so common usage and it will be unfortunate if we slow down more common rebalances in favor to have this less common one.
Hi akalash thanks for your reminder!
In our company, adaptive Partitioner is an optional optimization feature that is not enabled by default, and is usually enabled for jobs that have external heavy IO access. The performance overhead of select Channel is acceptable compared to the backpressure and lag caused by high load nodes, and can bring a positive gain of about 20%.
I wrote a doc the other day, Adaptive Channel selection for partitioner. You are welcome to give some professional advice.
Solution 1:Simple to implement, but each selectChannel needs to be traversed, there will be a certain performance overhead; [Already verified online, I'm trying to test out the performance overhead at different parallelism in a similar way to benchmark.]
Solution 2:almost no performance overhead for select channel, but requiring additional operations in the hot path and the need to operate within locks, this solution would be more prudent and require more discussion and validation.
I looked at your implementation of LoadBasedRecordWriter, If I haven't missed any details, SubpartitionStatistic is only updated at the time of emit, this statistic does not represent the true computing power of downstream operators. Maybe we want to achieve different goals, what I want to solve is to avoid the impact of high load nodes on flink job throughput.
Looking forward to more feedback.
Hi akalash, thanks for the remind.
I agree that the impaction to the normal rebalance must be measured. Though I wonder why the implementation in the pull request could impact significantly. Hopefully we can implement this time in a more controllable way and make sure the impaction is acceptable.
On the other hand, in my opinion, if the adaptive rebalance does little regression to the performance, maybe we can make it 'suggested' or even 'default', unless the users need all subtasks process exactly the same amount of records.
Hi tartarus,
Thanks for the doc! As we discussed offline, we'd better make the issue a FLIP and raise a formal discussion in the mailing list. As others mentioned, the feature was discussed before and faced quite some problems. I think we need a formal proposal with some implementation plans, then provide some benchmark results with a POC version, which include the gain of performance in applicable scenes, and the impaction to normal rebalance.
I think there are ways to address problems with akalash proposal, without adding too much overhead (also checking getWritingThreadTotalNumberOfSentBytes for the channel that we haven't selected?). I think both of your proposed solutions would be quite costly tartarus, adding extra synchronisations that we don't necessarily need with some clever heuristic.
I agree with pltbkd, we would probably need a FLIP, as this will be changing public APIs (either changing behaviour of an existing rebalance(), or adding a new one like loadRebalance()).
I wonder why the implementation in the pull request could impact significantly
It wasn't significant impact. As I remember there were two sensitive microbenchamrks which showed a drop of about 2-3%. Maybe we even could accept it but we haven't discussed it properly
I looked at your implementation of LoadBasedRecordWriter If I havent missed any details SubpartitionStatistic is only updated at the time of emit this statistic does not represent the true computing power of downstream operators
Yes, you are absolutely right that it is updated only at the time of emit which is not so perfect. But it was my small POC I just tried to show that it is possible to do without extra locks.
The logic in my current PR:
- During the emit we take `currentChannel` and `nextChannel`(currentChannel + 1)
- If `bytesInQueue(currentChannel)` > `bytesInQueue(nextChannel)` then we set `nextChannel` as `currentChannel` otherwise leave `currentChannel` as is
As already was noticed, the current solution can be stuck on one channel forever since it never updates the bytesInQueue for other channels but current.
The improved logic can look like this:
- We have `roundRobinChannel`(the channel which increased by 1 on every emit)
- During the emit we increase `roundRobinChannel` by 1
- We update `bytesInQueue` for `roundRobinChannel`
- Now, If `bytesInQueue(currentChannel)` > `bytesInQueue(roundRobinChannel)` then we set `roundRobinChannel` as `currentChannel` otherwise leave `currentChannel` as is
So if we have N channels then each channel will be updated with each N-th record which is accurate enough for our goal I think. But it is not clear yet how to update `roundRobinChannel` cheaply.
Solution 1
I still wonder about the implementation of getting buffer queue sizes since it is the tricky part. I solved this problem somehow in my PR but anyway, it still questions there.
As well as pnowojski, I also have concerns about the efficiency of this solution since it is O(ChannelsCount) that doesn't look optimal. But we can just optimize that algorithm to something like I describe before or similar.
Solution 2
I doubt that we will be ready for extra synchronization on the hot path especially in common code for all partitioners. But of course, we can discuss it anyway but only with more specific implementation ideas.
In conclusion, let's proceed with FLIP. And we can think:
- How to effectively get channel queue size
- How to effectively calculate the next channel
akalash pnowojski pltbkd Thank you very much for your suggestions and information, it was very valuable to me!
I will prepare a FLIP as soon, I tend to propose a new API to for Adaptive Partitioner, so as not to bring any impact on the user's existing jobs, adaptive Partitioner as an optional optimization attempt,
even though there is some performance overhead, users may be able to accept.
Hi tartarus , some of our flink scenarios also need to use adaptive rebalance, for example: sink data to hdfs, and some subtasks meet slow datanode, flink can forward data to idle subtasks.
We have finished the internal version, and it works well. I read all discussions in detail and I want to share our solution here. It's similar your solution1, and solved the performance issue.
We defined an option: dynamic-rebalance.max-traverse-size. It means we don't traverse all sub-partitions, we just traverse the next traverse size based on current channel. For general logic, we find the most idle channel from channel[currentChannel+1] to channel[currentChannel+maxTraverseSize].
For example, dynamic-rebalance.max-traverse-size=10 , sub-partitions size = 100 and currentChannel=5. When sending the a record, we find the most idle channel from channel-6 to channel-15 instead of all channels, and mark the most idle channel as the currentChannel.
Why do we choose this solution?
- On our production, just a small number of subtasks is very slow, and a large number of subtasks is healthy. And the max-traverse-size=10 is enough to skip these slow subtasks.
- It solved the performance issue of solution1.
- It doesn't introduce extra lock or collection to maintain the idle channel set than solution2.
- We improved the max-traverse-size strategy. We will choose the channel directly if we found the pending size of any channel is 0 from channel[currentChannel+1] to channel[currentChannel+maxTraverseSize]. Because the channel is most idle channel when pending size is 0.
Here is core code:
private void chooseLessLoadedSubPartition() { long bytesInQueue = Long.MAX_VALUE; int candidateChannel = 0; for (int i = 1; i <= maxTraverseSize; i++) { int channel = (currentChannel + i) % numberOfSubpartitions; long bytesInQueueCurrent = targetPartition.getBytesInQueueUnsafe(channel); if (bytesInQueueCurrent == 0) { // If there isn't any pending data in the current channel, choose this channel // directly. currentChannel = channel; return; } if (bytesInQueueCurrent < bytesInQueue) { candidateChannel = channel; bytesInQueue = bytesInQueueCurrent; } } currentChannel = candidateChannel; }
Looking forward to your feedback and suggestion, thanks.
fanrui Thank you very much for sharing your solution!
Internally, I also optimized solution 1, not to find the global optimal channel, but to find the optimal one in most of the channels.
I did a performance test for the channel select policy and collected information on the time taken for 10,000 calls, like follows:
Time consumption in nanoseconds:
subpartition number | 10(avg) | 10(P95) | 10(P99) | 100(avg) | 100(P95) | 100(P99) | 200(avg) | 200(P95) | 200(P99) | 300(avg) | 300(P95) | 300(P99) | 500(avg) | 500(P95) | 500(P99) | 800(avg) | 800(P95) | 800(P99) | 1000(avg) | 1000(P95) | 1000(P99) | 2000(avg) | 2000(P95) | 2000(P99) | 3000(avg) | 3000(P95) | 3000(P99) | 5000(avg) | 5000(P95) | 5000(P99) | 8000(avg) | 8000(P95) | 8000(P99) | 10000(avg) | 10000(P95) | 10000(P99) |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Rebalance | 68 | 90 | 102 | 66 | 94 | 116 | 69 | 86 | 96 | 68 | 104 | 199 | 68 | 89 | 99 | 74 | 118 | 131 | 67 | 88 | 99 | 67 | 90 | 104 | 58 | 93 | 110 | 70 | 89 | 95 | 61 | 81 | 92 | 63 | 86 | 99 |
solution 1(Global Optimal) | 203 | 372 | 1411 | 836 | 1285 | 3488 | 1152 | 2419 | 6547 | 1486 | 3672 | 9346 | 1926 | 5105 | 12107 | 2899 | 9285 | 21338 | 4569 | 13939 | 25574 | 8023 | 7929 | 24980 | 11701 | 12371 | 34079 | 19364 | 19910 | 53594 | 32792 | 42199 | 104582 | 37397 | 39548 | 73330 |
optimized solution 1(Most of the best) | 214 | 360 | 1652 | 1223 | 1447 | 4509 | 1508 | 2613 | 5816 | 1808 | 3815 | 8756 | 2124 | 5962 | 12336 | 3048 | 9012 | 21896 | 3277 | 10599 | 28300 | 7403 | 18832 | 37264 | 10543 | 22218 | 43845 | 15944 | 20266 | 62732 | 28271 | 41126 | 54055 | 36096 | 52062 | 82243 |
Traverse 5 channels | 213 | 357 | 1268 | 190 | 461 | 984 | 169 | 290 | 941 | 239 | 393 | 1221 | 220 | 566 | 1240 | 213 | 510 | 1100 | 209 | 478 | 1041 | 230 | 638 | 1336 | 190 | 472 | 964 | 183 | 388 | 945 | 178 | 516 | 978 | 224 | 511 | 1148 |
Traverse 10 channels | 226 | 440 | 1776 | 305 | 536 | 1601 | 227 | 477 | 1785 | 297 | 520 | 1564 | 243 | 507 | 1672 | 220 | 472 | 1422 | 276 | 487 | 1443 | 280 | 536 | 2042 | 254 | 535 | 1895 | 254 | 495 | 1976 | 249 | 472 | 1691 | 293 | 548 | 1631 |
Traverse 20 channels | 244 | 495 | 1706 | 298 | 421 | 973 | 296 | 413 | 961 | 394 | 725 | 844 | 279 | 387 | 854 | 406 | 695 | 845 | 360 | 499 | 1144 | 335 | 478 | 1112 | 302 | 495 | 960 | 392 | 703 | 844 | 400 | 688 | 866 | 343 | 681 | 1030 |
The above table shows that the time complexity of Rebalance is O(1), while our optimization is O( n ), which is consistent with expectations. We are constantly trying to reduce the N.
`maxTraverseSize` solution, which can be selected by the user according to the actual situation to make the best choice!
I suggest we use a combination of option and api to set `maxTraverseSize`,
If maxTraverseSize is set through api, we can override the value of option; if not set through api, then use the value of option, because api can do the granularity of each adaptiveRebalance, while option is flink job granularity; but in SQL job the user can only set through option;
The public api can be defined as follows:
public DataStream<T> adaptiveRebalance(int maxTraverseSize)
pnowojski akalash pltbkd what do you think?
Looking forward to your feedback and suggestion, thanks.
Hi tartarus , thanks for the hard testing.
From the result, I think the maxTraverseSize is a good choice! The Time consumption won't increase when the subpartition number is high.
I still have some questions:
- What's the difference between solution 1(Global Optimal) and optimized solution 1(Most of the best)?
- When maxTraverseSize > the subpartition number, the maxTraverseSize should be the subpartition number. For example, maxTraverseSize = 20, and the subpartition number is 10.
- Do we need to set maxTraverseSize through api? The option is enough for most of cases.
- How the SQL job to use the adaptiveRebalance? The rebalance should be the default.
Anyway, it's the time to start a FLIP, and anyone can discuss the adaptiveRebalance in the mail list~
tartarus thanks for the hard testing too, and can you share how much this time-consuming increase has affected the performance of the entire job?
we also use a similar functions internally, but the implementation is slightly different. In order to avoid the traversal of each operation in most cases, we limit the maximum buffersInBacklog for each Channel(In extreme cases it can also lead to a global traversal):
1. As long as the buffersInBacklog of this Channel does not exceed the threshold, use this Channel directly, otherwise, search for the next channel;
2. When processing the next data, the same logical judgment will be made for lastChannelId + 1.
Looking forward to the FLIP discussion
I also like the idea with maxTraverseSize option. I have a feeling that in the real world, values as small as 5 could be good enough. If not, and we really need values like ~20 or larger, we could also think about something like:
- For N'th adaptiveRebalance call, where N % 10 == 1
- iterate 20 channels
- discard 10 worst channels, use 1st best channel
- For N'th adaptiveRebalance call, where N % 10 != 1
- use remaining 9 best channels from last iteration
But I doubt it's really needed. Even for maxTraverseSize==5, we can always completely skip up to 4 slow channels if they are completely bottlenecked and bundled together. If we have more slow channels bundled together, those channels would be only getting 20% load compared to the fast channels. So this solution would be still working perfectly fine, if "slow" machine/machines are up to 5x slower compared to normal machines.
I'm also looking forward to the FLIP. If I haven't made a mistake in the previous paragraph I think it would be good to add it to the FLIP as a justification why we don't need large maxTraverseSize.
Btw, before publishing a FLIP, could you run mapRebalanceMapSink.F27_UNBOUNDED benchmark from flink-benchmarks (you can check readme and especially the general remarks) and compare normal rebalance with your adaptive one? This benchmark is defined in src/main/java/org/apache/flink/benchmark/InputBenchmark.java file.
hi fanrui thank you for your reply, I will answer your concerns one by one
>What's the difference between solution 1(Global Optimal) and optimized solution 1(Most of the best)?
the solution 1 (Global Optimal) is Each record of data is traversed through all subpartitions to find the best channel. So the more subpartitions, the more time it takes!
optimized solution 1(Most of the best) is When a record is comes in, suppose it should be written to subpartition x, and if index x is in the first half of the subpartition array, then we find the optimal subpartition index from x to the end of subpartition array; if index x is in the second half of the subpartition array, then we find the optimal subpartition index from 0 to x;
>When maxTraverseSize > the subpartition number, the maxTraverseSize should be the subpartition number. For example, maxTraverseSize = 20, and the subpartition number is 10.
Yes, I have taken this case into account when testing
>How the SQL job to use the adaptiveRebalance? The rebalance should be the default.
If we introduce the new api adaptiveRebalance, how will SQL jobs use it and will SQL jobs not support adaptiveRebalance for now?
I will refer to everyone's comments to complete the FLIP
Hi wangm92 Thank you for sharing, your solution sounds similar to fanrui
I tested solution 1 (global optimum) with 100 parallels, Throughput is roughly 40% lower (without any business logic, completely testing select channel performance).
So I think maxTraverseSize is a good solution.
Hi pnowojski thank you for your reply.
I currently tend to use maxTraverseSize==5 to implement, more complex algorithmic strategies, we can discuss later, I will refer to your suggestion and use flink benchmark to test it.
FLIP will come soon
gaoyunhaii fanrui Please take a look at this issue, after our company use the Adaptive Rebalance/Rescale, job throughput improved by about 20% on average compared to the community version.
We can discuss how we can contribute this feature to the community!
please assign to me, thanks~