Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10524

Default decoder for ReadFromBigQuery does not support repeatable fields

Details

    • Bug
    • Status: Triage Needed
    • P2
    • Resolution: Fixed
    • 2.23.0
    • 2.25.0
    • sdk-py-core
    • None

    Description

      The code in https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L570

      handles decoding fields with mode "REPEATABLE" incorrectly. This bug leads to the following stack trace when running a query that returns results with repeatable fields represented as JSON arrays. The corresponding stack trace looks as follows:

      ...
       File "apache_beam/runners/common.py", line 1095, in apache_beam.runners.common._OutputProcessor.process_outputs
        File "/Users/roman/src/ml/data-pipelines/etl/venv/lib/python3.6/site-packages/apache_beam/io/concat_source.py", line 89, in read
          range_tracker.sub_range_tracker(source_ix)):
        File "/Users/roman/src/ml/data-pipelines/etl/venv/lib/python3.6/site-packages/apache_beam/io/textio.py", line 210, in read_records
          yield self._coder.decode(record)
        File "/Users/roman/src/ml/data-pipelines/etl/venv/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 566, in decode
          return self._decode_with_schema(value, self.fields)
        File "/Users/roman/src/ml/data-pipelines/etl/venv/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 580, in _decode_with_schema
          value[field.name], field.fields)
        File "/Users/roman/src/ml/data-pipelines/etl/venv/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 575, in _decode_with_schema
          value[field.name] = None
      TypeError: list indices must be integers or slices, not str

       

      The fix could look something like this (untested):

      def _decode_with_schema(self, value, schema_fields):
          for field in schema_fields:
              if field.name not in value:
                  # The field exists in the schema, but it doesn't exist in this row.
                  # It probably means its value was null, as the extract to JSON job
                  # doesn't preserve null fields
                  value[field.name] = None
                  continue
      
              if field.type == 'RECORD':
                  if field.mode == 'REPEATED':
                      value[field.name] = [self._decode_with_schema(val, field.fields)
                                           for val in value[field.name]]
                  else:
                      value[field.name] = self._decode_with_schema(value[field.name],
                                                                   field.fields)
              else:
                  try:
                      converter = self._converters[field.type]
                  except KeyError:
                      # No need to do any conversion
                      continue
      
                  if field.mode == 'REPEATED':
                      value[field.name] = map(converter, value[field.name])
                  else:
                      value[field.name] = converter(value[field.name])
          return value
      

      Attachments

        Issue Links

          Activity

            People

              kamilwu Kamil Wasilewski
              roman-captural Roman Frigg
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h
                  2h