Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-9417

Unable to Read form BigQuery and File system in same pipeline

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • None
    • 2.20.0
    • io-py-gcp
    • macbook pro cataline, python3.7, apache-beam[gcp]===2.19.0

    Description

      I am trying to read from Bigquery and Local file system in my apache beam[gcp] pipeline.

      pipeline_options = PipelineOptions()
      options = pipeline_options.view_as(PreProcessOptions)
      options.view_as(SetupOptions).save_main_session = True
      
      p = beam.Pipeline(options=options)
      
      apn_query = "select * from `{bq_project}.config.apn` where customer='{customer}'"\
       .format(bq_project=options.bq_project, customer=options.customer)
      
      file_path = "mycsv.csv.gz"
      
      apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True))
      preprocess_rows = p | beam.io.ReadFromText(file_path, coder=UnicodeCoder())
      
      

       

      When I am running this job, I am getting below error

       

      Traceback (most recent call last):
       File "/etl/dataflow/etlTXLPreprocessor.py", line 125, in <module>
       run()
       File "/etl/dataflow/etlTXLPreprocessor.py", line 120, in run
       p.run().wait_until_finish()
       File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 461, in run
       self._options).run(False)
       File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 474, in run
       return self.runner.run_pipeline(self, self._options)
       File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 182, in run_pipeline
       return runner.run_pipeline(pipeline, options)
       File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 413, in run_pipeline
       pipeline.replace_all(_get_transform_overrides(options))
       File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 443, in replace_all
       self._replace(override)
       File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 340, in _replace
       self.visit(TransformUpdater(self))
       File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 503, in visit
       self._root_transform().visit(visitor, self, visited)
       File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 939, in visit
       part.visit(visitor, pipeline, visited)
       File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 939, in visit
       part.visit(visitor, pipeline, visited)
       File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 939, in visit
       part.visit(visitor, pipeline, visited)
       File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 942, in visit
       visitor.visit_transform(self)
       File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 338, in visit_transform
       self._replace_if_needed(transform_node)
       File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 301, in _replace_if_needed
       new_output = replacement_transform.expand(input_node)
       File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/sdf_direct_runner.py", line 87, in expand
       invoker = DoFnInvoker.create_invoker(signature, process_invocation=False)
       File "apache_beam/runners/common.py", line 360, in apache_beam.runners.common.DoFnInvoker.create_invoker
      TypeError: create_invoker() takes at least 2 positional arguments (1 given)

       

      But If I run my code like this

       
      pipeline_options = PipelineOptions()
      options = pipeline_options.view_as(PreProcessOptions)
      options.view_as(SetupOptions).save_main_session = True
      
      p = beam.Pipeline(options=options)
      file_path = "mycsv.csv.gz"
      preprocess_rows = p | beam.io.ReadFromText(file_path, coder=UnicodeCoder())
      

       

      or like this

       
      pipeline_options = PipelineOptions()
      options = pipeline_options.view_as(PreProcessOptions)
      options.view_as(SetupOptions).save_main_session = True
      
      p = beam.Pipeline(options=options)
      
      apn_query = "select * from `{bq_project}.config.apn` where customer='{customer}'"\
       .format(bq_project=options.bq_project, customer=options.customer)
      
      
      apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True))
      

       

      or even like this 

      pipeline_options = PipelineOptions()
      options = pipeline_options.view_as(PreProcessOptions)
      options.view_as(SetupOptions).save_main_session = True
      
      p = beam.Pipeline(options=options)
      
      apn_query = "select * from `{bq_project}.config.apn` where customer='{customer}'"\
       .format(bq_project=options.bq_project, customer=options.customer)
      
      
      apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True))
      apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True))

      the code just works fine.

       

      Is it a limitation of the apache beam to read from the same source?

      If so, can we add this feature? 

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            yesdeepakverma Deepak Verma
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: