Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-9228

_SDFBoundedSourceWrapper doesn't distribute data to multiple workers

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: P2
    • Resolution: Fixed
    • Affects Version/s: 2.16.0, 2.18.0, 2.19.0
    • Fix Version/s: 2.20.0
    • Component/s: sdk-py-core
    • Labels:
      None

      Description

      A user reported following issue.

      -------------------------------------------------
      I have a set of tfrecord files, obtained by converting parquet files with Spark. Each file is roughly 1GB and I have 11 of those.

      I would expect simple statistics gathering (ie counting number of items of all files) to scale linearly with respect to the number of cores on my system.

      I am able to reproduce the issue with the minimal snippet below

      import apache_beam as beam
      from apache_beam.options.pipeline_options import PipelineOptions
      from apache_beam.runners.portability import fn_api_runner
      from apache_beam.portability.api import beam_runner_api_pb2
      from apache_beam.portability import python_urns
      import sys
      
      pipeline_options = PipelineOptions(['--direct_num_workers', '4'])
      
      file_pattern = 'part-r-00*
      runner=fn_api_runner.FnApiRunner(
                default_environment=beam_runner_api_pb2.Environment(
                    urn=python_urns.SUBPROCESS_SDK,
                    payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
                              % sys.executable.encode('ascii')))
      
      p = beam.Pipeline(runner=runner, options=pipeline_options)
      
      lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern)
                       | beam.combiners.Count.Globally()
                       | beam.io.WriteToText('/tmp/output'))
      
      p.run()
      

      Only one combination of apache_beam revision / worker type seems to work (I refer to https://beam.apache.org/documentation/runners/direct/ for the worker types)

      • beam 2.16; neither multithread nor multiprocess achieve high cpu usage on multiple cores
      • beam 2.17: able to achieve high cpu usage on all 4 cores
      • beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails when trying to serialize the Environment instance most likely because of a change from 2.17 to 2.18.

      I also tried briefly SparkRunner with version 2.16 but was no able to achieve any throughput.

      What is the recommnended way to achieve what I am trying to ? How can I troubleshoot ?
      ----------------------------------------------------------------------------------------------------------------------------------------------

      This is caused by this PR.

      A workaround is tried, which is rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed that data is distributed to multiple workers, however, there are some regressions with SDF wrapper tests.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                hannahjiang Hannah Jiang
                Reporter:
                hannahjiang Hannah Jiang
              • Votes:
                1 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 3h 20m
                  3h 20m