Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-14112

ReadFromBigQuery cannot be used with the interactive runner

Details

    • Improvement
    • Status: Triage Needed
    • P2
    • Resolution: Fixed
    • 2.35.0, 2.36.0, 2.37.0
    • 2.39.0
    • None

    Description

      A change in Apache Beam 2.35.0 caused ReadFromBigQuery to no longer work with the Python interactive runner.

      The error can be reproduced with the following code:

      #!/usr/bin/env python
      """Reproduce pickle issue when using RFBQ in interactive runner."""
      
      import apache_beam as beam                                                        
      from apache_beam.runners.interactive.interactive_runner import InteractiveRunner  
      import apache_beam.runners.interactive.interactive_beam as ib                     
                                                                                        
                                                                                        
      options = beam.options.pipeline_options.PipelineOptions(                          
          project="...",                                                
          temp_location="...",                              
      )                                                                                 
                                                                                        
      pipeline = beam.Pipeline(InteractiveRunner(), options=options)                    
      pcoll = pipeline | beam.io.ReadFromBigQuery(query="SELECT 1")                     
      print(ib.collect(pcoll))
      Traceback (most recent call last):
        File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 536, in apache_beam.runners.common.SimpleInvoker.invoke_process
        File "apache_beam/runners/common.py", line 1361, in apache_beam.runners.common._OutputProcessor.process_outputs
        File "apache_beam/runners/worker/operations.py", line 214, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
        File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
        File "apache_beam/runners/worker/opcounters.py", line 211, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
        File "apache_beam/runners/worker/opcounters.py", line 250, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
        File "apache_beam/coders/coder_impl.py", line 1425, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
        File "apache_beam/coders/coder_impl.py", line 1436, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
        File "apache_beam/coders/coder_impl.py", line 987, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
        File "apache_beam/coders/coder_impl.py", line 987, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
        File "apache_beam/coders/coder_impl.py", line 207, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
        File "apache_beam/coders/coder_impl.py", line 1514, in apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
        File "apache_beam/coders/coder_impl.py", line 246, in apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
        File "apache_beam/coders/coder_impl.py", line 441, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
        File "apache_beam/coders/coder_impl.py", line 268, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/coders/coders.py", line 802, in <lambda>
          lambda x: dumps(x, protocol), pickle.loads)
      TypeError: can't pickle generator objects
      
      During handling of the above exception, another exception occurred:
      
      Traceback (most recent call last):
        File "repro.py", line 16, in <module>
          print(ib.collect(pcoll))
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/utils.py", line 270, in run_within_progress_indicator
          return func(*args, **kwargs)
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_beam.py", line 664, in collect
          recording = recording_manager.record([pcoll], max_n=n, max_duration=duration)
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/recording_manager.py", line 458, in record
          self.user_pipeline.options).run()
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/pipeline_fragment.py", line 113, in run
          return self.deduce_fragment().run()
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/pipeline.py", line 573, in run
          return self.runner.run_pipeline(self, self._options)
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_runner.py", line 195, in run_pipeline
          pipeline_to_execute.run(), pipeline_instrument)
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/pipeline.py", line 573, in run
          return self.runner.run_pipeline(self, self._options)
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
          return runner.run_pipeline(pipeline, options)
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 200, in run_pipeline
          pipeline.to_runner_api(default_environment=self._default_environment))
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 210, in run_via_runner_api
          return self.run_stages(stage_context, stages)
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 396, in run_stages
          runner_execution_context, bundle_context_manager)
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 667, in _run_stage
          bundle_manager))
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 784, in _run_bundle
          data_input, data_output, input_timers, expected_timer_output)
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1094, in process_bundle
          result_future = self._worker_handler.control_conn.push(process_bundle_req)
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 378, in push
          response = self.worker.do_instruction(request)
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 581, in do_instruction
          getattr(request, request_type), request.instruction_id)
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 618, in process_bundle
          bundle_processor.process_bundle(instruction_id))
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 996, in process_bundle
          element.data)
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded
          self.output(decoded_value)
        File "apache_beam/runners/worker/operations.py", line 346, in apache_beam.runners.worker.operations.Operation.output
        File "apache_beam/runners/worker/operations.py", line 348, in apache_beam.runners.worker.operations.Operation.output
        File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
        File "apache_beam/runners/worker/operations.py", line 707, in apache_beam.runners.worker.operations.DoOperation.process
        File "apache_beam/runners/worker/operations.py", line 708, in apache_beam.runners.worker.operations.DoOperation.process
        File "apache_beam/runners/common.py", line 1200, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 1265, in apache_beam.runners.common.DoFnRunner._reraise_augmented
        File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 536, in apache_beam.runners.common.SimpleInvoker.invoke_process
        File "apache_beam/runners/common.py", line 1361, in apache_beam.runners.common._OutputProcessor.process_outputs
        File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
        File "apache_beam/runners/worker/operations.py", line 707, in apache_beam.runners.worker.operations.DoOperation.process
        File "apache_beam/runners/worker/operations.py", line 708, in apache_beam.runners.worker.operations.DoOperation.process
        File "apache_beam/runners/common.py", line 1200, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 1265, in apache_beam.runners.common.DoFnRunner._reraise_augmented
        File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 536, in apache_beam.runners.common.SimpleInvoker.invoke_process
        File "apache_beam/runners/common.py", line 1361, in apache_beam.runners.common._OutputProcessor.process_outputs
        File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
        File "apache_beam/runners/worker/operations.py", line 707, in apache_beam.runners.worker.operations.DoOperation.process
        File "apache_beam/runners/worker/operations.py", line 708, in apache_beam.runners.worker.operations.DoOperation.process
        File "apache_beam/runners/common.py", line 1200, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 1281, in apache_beam.runners.common.DoFnRunner._reraise_augmented
        File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 536, in apache_beam.runners.common.SimpleInvoker.invoke_process
        File "apache_beam/runners/common.py", line 1361, in apache_beam.runners.common._OutputProcessor.process_outputs
        File "apache_beam/runners/worker/operations.py", line 214, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
        File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
        File "apache_beam/runners/worker/opcounters.py", line 211, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
        File "apache_beam/runners/worker/opcounters.py", line 250, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
        File "apache_beam/coders/coder_impl.py", line 1425, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
        File "apache_beam/coders/coder_impl.py", line 1436, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
        File "apache_beam/coders/coder_impl.py", line 987, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
        File "apache_beam/coders/coder_impl.py", line 987, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
        File "apache_beam/coders/coder_impl.py", line 207, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
        File "apache_beam/coders/coder_impl.py", line 1514, in apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
        File "apache_beam/coders/coder_impl.py", line 246, in apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
        File "apache_beam/coders/coder_impl.py", line 441, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
        File "apache_beam/coders/coder_impl.py", line 268, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
        File "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/coders/coders.py", line 802, in <lambda>
          lambda x: dumps(x, protocol), pickle.loads)
      TypeError: can't pickle generator objects [while running 'ReadFromBigQuery/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']

      I suspect the error is caused by this change that was first released in 2.35.0: https://github.com/apache/beam/pull/15610

      Attachments

        Issue Links

          Activity

            People

              ningk Ning
              cccyang Chun Yang
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10.5h
                  10.5h