Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-11158

Side Inputs to beam.Partition

Details

    • Bug
    • Status: Triage Needed
    • P3
    • Resolution: Unresolved
    • None
    • None
    • sdk-py-core
    • None

    Description

      Side inputs work with a regular ParDo and function, but I can't seem to get it to work with beam.Partition. The code and exception below demonstrates the problem.

      ```
      import apache_beam as beam

      def main():

      class SideFn(beam.PartitionFn):
      def partition_for(self, element, *args, **kwargs):
      print(element, args, kwargs)

      def just_print(element, *args, **kwargs):
      print(element, args, kwargs)

      with beam.Pipeline() as p:
      side = p | 'CreateSide' >> beam.Create(['a'])
      p | beam.Create([1, 2, 3]) | beam.Partition(SideFn(), 99, side=beam.pvalue.AsSingleton(side))

      1. p | beam.Create([1, 2, 3]) | beam.ParDo(just_print, 99, side=beam.pvalue.AsSingleton(side))

      if _name_ == '_main_':
      main()
      ```

      /Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/bin/python /Users/joetoth/projects/joetoth.com/psy/part.py
      Traceback (most recent call last):
      File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/internal/util.py", line 134, in <genexpr>
      (k, next(v_iter)) if isinstance(v, ArgumentPlaceholder) else (k, v) for k,
      StopIteration

      The above exception was the direct cause of the following exception:

      Traceback (most recent call last):
      File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
      File "apache_beam/runners/common.py", line 804, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
      File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/internal/util.py", line 133, in insert_values_in_args
      new_kwargs = dict(
      RuntimeError: generator raised StopIteration

      During handling of the above exception, another exception occurred:

      Traceback (most recent call last):
      File "/Users/joetoth/projects/joetoth.com/psy/part.py", line 19, in <module>
      main()
      File "/Users/joetoth/projects/joetoth.com/psy/part.py", line 14, in main
      p | beam.Create([1, 2, 3]) | beam.Partition(SideFn(), 99, side=beam.pvalue.AsSingleton(side))
      File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/pipeline.py", line 568, in _exit_
      self.result = self.run()
      File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/pipeline.py", line 547, in run
      return self.runner.run_pipeline(self, self._options)
      File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py", line 119, in run_pipeline
      return runner.run_pipeline(pipeline, options)
      File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 175, in run_pipeline
      self._latest_run_result = self.run_via_runner_api(
      File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 186, in run_via_runner_api
      return self.run_stages(stage_context, stages)
      File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 344, in run_stages
      stage_results = self._run_stage(
      File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 527, in _run_stage
      last_result, deferred_inputs, fired_timers = self._run_bundle(
      File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 571, in _run_bundle
      result, splits = bundle_manager.process_bundle(
      File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 852, in process_bundle
      result_future = self._worker_handler.control_conn.push(process_bundle_req)
      File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 353, in push
      response = self.worker.do_instruction(request)
      File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 483, in do_instruction
      return getattr(self, request_type)(
      File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 519, in process_bundle
      bundle_processor.process_bundle(instruction_id))
      File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 984, in process_bundle
      input_op_by_transform_id[element.transform_id].process_encoded(
      File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded
      self.output(decoded_value)
      File "apache_beam/runners/worker/operations.py", line 354, in apache_beam.runners.worker.operations.Operation.output
      File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
      File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
      File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
      File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
      File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
      File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
      File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
      File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
      File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
      File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
      File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
      File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/future/utils/_init_.py", line 446, in raise_with_traceback
      raise exc.with_traceback(traceback)
      File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
      File "apache_beam/runners/common.py", line 804, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
      File "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/internal/util.py", line 133, in insert_values_in_args
      new_kwargs = dict(
      RuntimeError: generator raised StopIteration [while running 'Partition(SideFn)/ParDo(ApplyPartitionFnFn)/ParDo(ApplyPartitionFnFn)']

      Process finished with exit code 1

      Attachments

        Activity

          People

            Unassigned Unassigned
            weazelb0y Joseph Toth
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: