Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10544

Select Types not equal with nested schema

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Triage Needed
    • Priority: P3
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: dsl-sql, sdk-java-core
    • Labels:

      Description

      When using SQL transform to join a large nested schema to a flat table getting an error about "Types not equal" from Select [1]

      We are not able the test of our use of SqlTransform to pass with direct runner. All code is checked into CSR [2].

      Things of note:
      Calcite Query Planner

      Query (the real business logic was much more complex but this is sufficient to reproduce issue in our test)
      ```sql
      SELECT
      t1.DeviceName AS DeviceName,
      t1.LinkName AS LinkName,
      t1.HostName AS HostName,
      t1.MeasuredAt AS MeasuredAt,
      t2.b_dBm AS b_dBm
      FROM
      RealtimeRows AS t1
      INNER JOIN
      --BigQuery Dimension Side Input
      TxPowerSideInput AS t2
      ON
      t1.DeviceName = t2.DeviceName
      ```

      Tables created like so (though in real tive )
      ```java
      // This table has the same schema to the real incoming Pub/Sub messages
      // in the real world use case.
      PCollection<Row> realtimeTestData = pipeline
      .apply("Read 1Hz staging",
      BigQueryIO
      .readTableRowsWithSchema()
      .fromQuery(
      "SELECT * FROM `taara-db.jake_views.staging_sample_float`")
      .usingStandardSql())
      .apply(Convert.toRows());

      PCollection<Row> txPowerCalcRows = pipeline
      .apply("Read Tx Power Calc Side Input",
      BigQueryIO
      .readTableRowsWithSchema()
      .fromQuery(
      "SELECT * FROM `taara-db`.MANUFACTURING.tx_power_timeinvariant_calculations")
      .usingStandardSql())
      .apply(Convert.toRows());
      ```

      Relevant java snippet
      ```java
      PCollection<Row> out = tables
      .apply(
      "Join to dimension Data",
      SqlTransform
      .query(sql)
      .registerUdf("POW", Pow.class)
      .registerUdf("SQRT", Sqrt.class)
      .registerUdf("LOG10", Log10.class)
      .registerUdf("GREATEST", Greatest.class)
      .registerUdf("EXTRACT_OFFSET", ExtractArrayOffset.class)
      .registerUdf("PARSE_TIMESTAMP", ParseTimestamp.class)
      .registerUdf("UNIX_SECONDS", UnixSeconds.class)
      );
      ```

      [1] https://github.com/apache/beam/blob/b564239081e9351c56fb0e7d263495b95dd3f8f3/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java#L203
      [2] https://source.cloud.google.com/taara-db/pso-taara-realtime-margin/+/master:streaming-join/streaming-join/src/test/java/com/google/x/taara/dataflow/transforms/RxTxPowersCorrFERCombinedSqlTransformIT.java

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              data-runner0 Jacob Ferriero
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: