Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7005

Optimization steps are missing for nested registered tables

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0, 1.3.1
    • Fix Version/s: 1.4.0, 1.3.2
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Tables that are registered (implicitly or explicitly) do not pass the first three optimization steps:

      • decorrelate
      • convert time indicators
      • normalize the logical plan

      E.g. this has the wrong plan right now:

      val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
      
      val table1 = tEnv.sql(s"""SELECT 1 + 1 FROM $table""") // not optimized
      val table2 = tEnv.sql(s"""SELECT myrt FROM $table1""")
      
      val results = table2.toAppendStream[Row]
      

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user twalthr opened a pull request:

          https://github.com/apache/flink/pull/4186

          FLINK-7005 [table] Optimization steps are missing for nested registered tables

          This PR solves the bug described in FLINK-7005. This PR adds another stage to the optimization phase that converts table scans to full plans. However, it makes 2 rules non-optional. Do we want to make those two rules also configurable through the Calcite config?

          This should definitely be part of Flink 1.3.2.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/twalthr/flink FLINK-7005

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4186.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4186


          commit 5cfc366bba6899f2def57a92af4856ab3e63c0b0
          Author: twalthr <twalthr@apache.org>
          Date: 2017-06-26T13:22:11Z

          FLINK-7005 [table] Optimization steps are missing for nested registered tables


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/4186 FLINK-7005 [table] Optimization steps are missing for nested registered tables This PR solves the bug described in FLINK-7005 . This PR adds another stage to the optimization phase that converts table scans to full plans. However, it makes 2 rules non-optional. Do we want to make those two rules also configurable through the Calcite config? This should definitely be part of Flink 1.3.2. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-7005 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4186.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4186 commit 5cfc366bba6899f2def57a92af4856ab3e63c0b0 Author: twalthr <twalthr@apache.org> Date: 2017-06-26T13:22:11Z FLINK-7005 [table] Optimization steps are missing for nested registered tables
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4186#discussion_r124016791

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala —
          @@ -422,4 +422,22 @@ class ExpressionReductionTest extends TableTestBase

          { util.verifyTable(result, expected) }

          + @Test
          + def testNestedTablesReduction(): Unit = {
          + val util = streamTestUtil()
          — End diff –

          I suggest add a `batchTestUtil()` test case as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4186#discussion_r124016791 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala — @@ -422,4 +422,22 @@ class ExpressionReductionTest extends TableTestBase { util.verifyTable(result, expected) } + @Test + def testNestedTablesReduction(): Unit = { + val util = streamTestUtil() — End diff – I suggest add a `batchTestUtil()` test case as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/4186

          Looks good from my side. I think it's fine to make the CONV_RULES non-optional for now.

          +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/4186 Looks good from my side. I think it's fine to make the CONV_RULES non-optional for now. +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4186#discussion_r124264730

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala —
          @@ -422,4 +422,22 @@ class ExpressionReductionTest extends TableTestBase

          { util.verifyTable(result, expected) }

          + @Test
          + def testNestedTablesReduction(): Unit = {
          + val util = streamTestUtil()
          — End diff –

          Thanks @sunjincheng121. I will add a batch test case.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4186#discussion_r124264730 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala — @@ -422,4 +422,22 @@ class ExpressionReductionTest extends TableTestBase { util.verifyTable(result, expected) } + @Test + def testNestedTablesReduction(): Unit = { + val util = streamTestUtil() — End diff – Thanks @sunjincheng121. I will add a batch test case.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4186

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4186
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.4.0: 02863f397f343502f037f62466643903fb246d7e
          Fixed in 1.3.2: 58b3b19c91539f11d3d2636090e3bec1dd7b75d0

          Show
          twalthr Timo Walther added a comment - Fixed in 1.4.0: 02863f397f343502f037f62466643903fb246d7e Fixed in 1.3.2: 58b3b19c91539f11d3d2636090e3bec1dd7b75d0

            People

            • Assignee:
              twalthr Timo Walther
              Reporter:
              twalthr Timo Walther
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development