Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-8452

TriggerLoadJobs.process in bigquery_file_loads schema is type str

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.15.0, 2.16.0
    • None
    • sdk-py-core
    • None

    Description

       I've found a first issue with the BigQueryFileLoads Transform and the type of the schema parameter.

      Triggering job beam_load_2019_10_11_140829_19_157670e4d458f0ff578fbe971a91b30a_1570802915 to load data to BigQuery table <TableReference
       datasetId: 'pyr_monat_dev'
       projectId: 'icentris-ml-dev'
       tableId: 'tree_user_types'>.Schema: {"fields": [{"name": "id", "type": "INTEGER", "mode": "required"}, {"name": "description", "type": "STRING", "mode": "nullable"}]}. Additional parameters: {}
      Retry with exponential backoff: waiting for 4.875033410381894 seconds before retrying _insert_load_job because we caught exception: apitools.base.protorpclite.messages.ValidationError: Expected type <clas
      s 'apache_beam.io.gcp.internal.clients.bigquery.bigquery_v2_messages.TableSchema'> for field schema, found {"fields": [{"name": "id", "type": "INTEGER", "mode": "required"}, {"name": "description", "type"
      : "STRING", "mode": "nullable"}]} (type <class 'str'>)
       Traceback for above exception (most recent call last):
        File "/opt/conda/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 206, in wrapper
          return fun(*args, **kwargs)
        File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 344, in _insert_load_job
          **additional_load_parameters
        File "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 791, in __init__
          setattr(self, name, value)
        File "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 973, in __setattr__
          object.__setattr__(self, name, value)
        File "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 1652, in __set__
          super(MessageField, self).__set__(message_instance, value)
        File "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 1293, in __set__
          value = self.validate(value)
        File "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 1400, in validate
          return self.__validate(value, self.validate_element)
        File "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 1358, in __validate
          return validate_element(value)   
        File "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 1340, in validate_element
          (self.type, name, value, type(value)))
       
      

       

      The triggering code looks like this:

       
      options.view_as(DebugOptions).experiments = ['use_beam_bq_sink']
             # Save main session state so pickled functions and classes
             # defined in _main_ can be unpickled
             options.view_as(SetupOptions).save_main_session = True
             custom_options = options.view_as(LoadSqlToBqOptions)
             with beam.Pipeline(options=options) as p:
                 (p
                     | "Initializing with empty collection" >> beam.Create([1])
                     | "Reading records from CloudSql" >> beam.ParDo(ReadFromRelationalDBFn(
                         username=custom_options.user,
                         password=custom_options.password,
                         database=custom_options.database,
                         table=custom_options.table,
                         key_field=custom_options.key_field,
                         batch_size=custom_options.batch_size))
                     | "Converting Row Object for BigQuery" >> beam.ParDo(BuildForBigQueryFn(custom_options.bq_schema))
                     | "Writing to BigQuery" >> beam.io.WriteToBigQuery(
                             table=custom_options.bq_table,
                             schema=custom_options.bq_schema,
                             write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED))
       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              ngoodrich Noah Goodrich
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h 20m
                  2h 20m