Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10666

WriteToBigQuery: Schema auto-detection is not supported.

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.23.0
    • None
    • io-py-gcp
    • Python 3.7.8
      Debian 9
    • Hide
      # Version 2.b: Includes side input as an argument in ParDo
      def csv_headers(csv_uri: str, client: storage.Client, chunk: int=512, sep=',' ):
          
          """ A function that accepts a Cloud Storage URI and returns a list of header names.
          
          Args:
              csv_uri (string): the GCS URI of the blob to be parsed.
              client (google.cloud.storage.client.Client): An instance of the Google
                  Cloud storage client.
              chunk (int): An approximation representing twice the size of a row
                  in bytes.
              sep (string): the separating character in between observations or fields
          
          Returns a list of header names.
          """
          
          SCHEME= csv_uri[:5]
          BUCKET, BLOB = csv_uri[5:].split('/')
          bucket = client.get_bucket(BUCKET)
          blob = bucket.get_blob(BLOB)
          headers = blob.download_as_string(end=chunk).decode('utf-8').splitlines()[0].split(sep)
          return headers


      class CSVToDict(beam.DoFn):
          
          def process(self, elem, headers):
              
              """ Custom ParDo method that overrides the beam.DoFn process method. Accepts an element
              and custom headers as side input to convert from CSV rows to a key-value mapped row. Converts
              the date format into one that BigQuery will accept (a derivative of ISO-8601).
              
              Args:
                  elem (string): Text lines from ReadFromText
                  headers: Separately parsed headers from the CSV file.
                  
              Returns a parsed row padded in a list.
              """
              
              elems = elem.split(',')
              month, day, year = elems[1].split('/')
              elems[1] = '-'.join([year, month, day])
              elem_map = {key: value for key, value in zip(headers[1:], elems[1:])}
              return [elem_map]

          
      client, CSV_URI = storage.Client(), 'gs://[REDACTED]'
      headers = csv_headers(CSV_URI, client)

      with beam.Pipeline() as p:
          GCStoBigQuery = (
              p
              | beam.io.ReadFromText(CSV_URI, skip_header_lines=1)
              | beam.ParDo(CSVToDict(), headers)
              | beam.io.WriteToBigQuery(
                  table=[REDACTED],
                  schema='SCHEMA_AUTODETECT'
              )
          )
      Show
      # Version 2.b: Includes side input as an argument in ParDo def csv_headers(csv_uri: str, client: storage.Client, chunk: int=512, sep=',' ):          """ A function that accepts a Cloud Storage URI and returns a list of header names.          Args:         csv_uri (string): the GCS URI of the blob to be parsed.         client (google.cloud.storage.client.Client): An instance of the Google             Cloud storage client.         chunk (int): An approximation representing twice the size of a row             in bytes.         sep (string): the separating character in between observations or fields          Returns a list of header names.     """          SCHEME= csv_uri[:5]     BUCKET, BLOB = csv_uri[5:].split('/')     bucket = client.get_bucket(BUCKET)     blob = bucket.get_blob(BLOB)     headers = blob.download_as_string(end=chunk).decode('utf-8').splitlines()[0].split(sep)     return headers class CSVToDict(beam.DoFn):          def process(self, elem, headers):                  """ Custom ParDo method that overrides the beam.DoFn process method. Accepts an element         and custom headers as side input to convert from CSV rows to a key-value mapped row. Converts         the date format into one that BigQuery will accept (a derivative of ISO-8601).                  Args:             elem (string): Text lines from ReadFromText             headers: Separately parsed headers from the CSV file.                      Returns a parsed row padded in a list.         """                  elems = elem.split(',')         month, day, year = elems[1].split('/')         elems[1] = '-'.join([year, month, day])         elem_map = {key: value for key, value in zip(headers[1:], elems[1:])}         return [elem_map]      client, CSV_URI = storage.Client(), 'gs://[REDACTED]' headers = csv_headers(CSV_URI, client) with beam.Pipeline() as p:     GCStoBigQuery = (         p         | beam.io.ReadFromText(CSV_URI, skip_header_lines=1)         | beam.ParDo(CSVToDict(), headers)         | beam.io.WriteToBigQuery(             table=[REDACTED],             schema='SCHEMA_AUTODETECT'         )     )

    Description

      This is regarding the WriteToBigQuery connector which is supposed to support SCHEMA_AUTODETECTION but throws an error indicating that Schema auto-detection is not supported for streaming inserts into BigQuery (Only for File Loads) even when batching from a file.

      SCHEMA_AUTODETECTION should be supported as per instructions (or consider adding more context to the instructions):  https://beam.apache.org/releases/pydoc/2.15.0/_modules/apache_beam/io/gcp/bigquery.html
       
       I am proofing out a CSV ingest workflow using the Direct Runner and Interactive Runners - the results are the same.  
       

      Working in Python 3 (3.7.8 packaged by conda-forge).  

      Pip installed via install apache_beam[gcp]

      apache_beam._version_ = 2.23.0

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            yaquino Yvan J. Aquino
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: