Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-2803

JdbcIO read is very slow when query return a lot of rows

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: Not applicable
    • Fix Version/s: 2.2.0
    • Component/s: io-java-jdbc
    • Labels:

      Description

      Hi,

      I'm using JdbcIO reader in batch mode with the postgresql driver.
      my select query return more than 5 Millions rows
      using cursors with Statement.setFetchSize().

      these ParDo are OK :

                .apply(ParDo.of(new ReadFn<>(this))).setCoder(getCoder())
                .apply(ParDo.of(new DoFn<T, KV<Integer, T>>() {
                  private Random random;
                  @Setup
                  public void setup() {
                    random = new Random();
                  }
                  @ProcessElement
                  public void processElement(ProcessContext context) {
                    context.output(KV.of(random.nextInt(), context.element()));
                  }
                }))
      

      but reshuffle is very very slow.
      it must be the GroupByKey with more than 5 millions of Key.

      .apply(GroupByKey.<Integer, T>create())
      

      is there a way to optimize the reshuffle, or use another method to prevent fusion ?

      thanks in advance,

      edit:
      I add some tests
      I use google dataflow as runner, with 1 worker, 2 max, and workerMachineType n1-standard-2
      and autoscalingAlgorithm THROUGHPUT_BASED

      First one : query return 500 000 results :

      as we can see,
      parDo(Read) is about 1300 r/s
      groupByKey is about 1080 r/s

      2nd : query return 1 000 000 results

      parDo(read) => 1480 r/s
      groupByKey => 634 r/s

      3rd : query return 1 500 000 results

      parDo(read) => 1700 r/s
      groupByKey => 565 r/s

      4th query return 2 000 000 results

      parDo(read) => 1485 r/s
      groupByKey => 537 r/s

      As we can see, groupByKey rate decrease when number of record are more important.

      ps: 2nd worker start just after ParDo(read) is succeed

        Attachments

        1. test500k.png
          37 kB
          Jérémie Vexiau
        2. test2M.jpg
          26 kB
          Jérémie Vexiau
        3. test1M.png
          40 kB
          Jérémie Vexiau
        4. test1500K.png
          43 kB
          Jérémie Vexiau

          Issue Links

            Activity

              People

              • Assignee:
                jkff Eugene Kirpichov
                Reporter:
                jevex Jérémie Vexiau
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: