Details
-
Bug
-
Status: Open
-
P1
-
Resolution: Unresolved
-
2.34.0
-
None
-
None
Description
I am trying to write to bigquery to different table destinations and I would like to create the tables dynamically if they don't exist already.
bigquery_rows | "Writing to Bigquery" >> WriteToBigQuery(lambda e: compute_table_name(e),
schema=compute_table_schema,
additional_bq_parameters=additional_bq_parameters,
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
)
The function compute_table_name is quite simple actually, I am just trying to get it to work.
def compute_table_name(element): if element['table'] == 'table_id': del element['table'] return "project_id:dataset.table_id"
The schema is detected correctly and the table IS created and populated with records. The problem is, the table ID I get is something along the lines of:
datasetId: 'dataset' projectId: 'project_id' tableId: 'beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP...
I have also tried returning a bigquery.TableReference object in my compute_table_name function to no avail.
def compute_table_name(element): if element['table'] == 'Radio': del element['table'] return TableReference( datasetId = "dataset_id", projectId = "project_id", tableId = "table_id" )