Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
None
-
macbook pro cataline, python3.7, apache-beam[gcp]===2.19.0
Description
I am trying to read from Bigquery and Local file system in my apache beam[gcp] pipeline.
pipeline_options = PipelineOptions() options = pipeline_options.view_as(PreProcessOptions) options.view_as(SetupOptions).save_main_session = True p = beam.Pipeline(options=options) apn_query = "select * from `{bq_project}.config.apn` where customer='{customer}'"\ .format(bq_project=options.bq_project, customer=options.customer) file_path = "mycsv.csv.gz" apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True)) preprocess_rows = p | beam.io.ReadFromText(file_path, coder=UnicodeCoder())
When I am running this job, I am getting below error
Traceback (most recent call last): File "/etl/dataflow/etlTXLPreprocessor.py", line 125, in <module> run() File "/etl/dataflow/etlTXLPreprocessor.py", line 120, in run p.run().wait_until_finish() File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 461, in run self._options).run(False) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 474, in run return self.runner.run_pipeline(self, self._options) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 182, in run_pipeline return runner.run_pipeline(pipeline, options) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 413, in run_pipeline pipeline.replace_all(_get_transform_overrides(options)) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 443, in replace_all self._replace(override) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 340, in _replace self.visit(TransformUpdater(self)) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 503, in visit self._root_transform().visit(visitor, self, visited) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 939, in visit part.visit(visitor, pipeline, visited) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 939, in visit part.visit(visitor, pipeline, visited) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 939, in visit part.visit(visitor, pipeline, visited) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 942, in visit visitor.visit_transform(self) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 338, in visit_transform self._replace_if_needed(transform_node) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 301, in _replace_if_needed new_output = replacement_transform.expand(input_node) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/sdf_direct_runner.py", line 87, in expand invoker = DoFnInvoker.create_invoker(signature, process_invocation=False) File "apache_beam/runners/common.py", line 360, in apache_beam.runners.common.DoFnInvoker.create_invoker TypeError: create_invoker() takes at least 2 positional arguments (1 given)
But If I run my code like this
pipeline_options = PipelineOptions()
options = pipeline_options.view_as(PreProcessOptions)
options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=options)
file_path = "mycsv.csv.gz"
preprocess_rows = p | beam.io.ReadFromText(file_path, coder=UnicodeCoder())
or like this
pipeline_options = PipelineOptions()
options = pipeline_options.view_as(PreProcessOptions)
options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=options)
apn_query = "select * from `{bq_project}.config.apn` where customer='{customer}'"\
.format(bq_project=options.bq_project, customer=options.customer)
apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True))
or even like this
pipeline_options = PipelineOptions()
options = pipeline_options.view_as(PreProcessOptions)
options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=options)
apn_query = "select * from `{bq_project}.config.apn` where customer='{customer}'"\
.format(bq_project=options.bq_project, customer=options.customer)
apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True))
apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True))
the code just works fine.
Is it a limitation of the apache beam to read from the same source?
If so, can we add this feature?