Details
-
Improvement
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
Not applicable
Description
Hi,
I'm using JdbcIO reader in batch mode with the postgresql driver.
my select query return more than 5 Millions rows
using cursors with Statement.setFetchSize().
these ParDo are OK :
.apply(ParDo.of(new ReadFn<>(this))).setCoder(getCoder()) .apply(ParDo.of(new DoFn<T, KV<Integer, T>>() { private Random random; @Setup public void setup() { random = new Random(); } @ProcessElement public void processElement(ProcessContext context) { context.output(KV.of(random.nextInt(), context.element())); } }))
but reshuffle is very very slow.
it must be the GroupByKey with more than 5 millions of Key.
.apply(GroupByKey.<Integer, T>create())
is there a way to optimize the reshuffle, or use another method to prevent fusion ?
thanks in advance,
edit:
I add some tests
I use google dataflow as runner, with 1 worker, 2 max, and workerMachineType n1-standard-2
and autoscalingAlgorithm THROUGHPUT_BASED
First one : query return 500 000 results :
as we can see,
parDo(Read) is about 1300 r/s
groupByKey is about 1080 r/s
2nd : query return 1 000 000 results
parDo(read) => 1480 r/s
groupByKey => 634 r/s
3rd : query return 1 500 000 results
parDo(read) => 1700 r/s
groupByKey => 565 r/s
4th query return 2 000 000 results
parDo(read) => 1485 r/s
groupByKey => 537 r/s
As we can see, groupByKey rate decrease when number of record are more important.
ps: 2nd worker start just after ParDo(read) is succeed
Attachments
Attachments
Issue Links
- links to