Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6476

Table environment register row data stream

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: Table API & SQL
    • Labels:
    • Environment:

      java/scala

      Description

      Registering as table source streams with Row is currently not possible:

      Java:

      DataStream<Row> ds = ...
      tableEnv.registerDataStream("MyTableRow", ds, "a, b, c ...");

      org.apache.flink.table.api.TableException: Source of type Row(f0: Integer, f1: Long, f2: Integer, f3: String, f4: Integer) cannot be converted into Table.
      at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:680)
      at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:363)
      at org.apache.flink.table.api.java.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:133)
      at org.apache.flink.table.api.java.stream.sql.SqlITCase.testRow2(SqlITCase.java:92)

      Scala:

      val ds:DataStream[Row] = ...
      tableEnv.registerDataStream("MyTableRow", ds, "a, b, c, d, e");

      org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.common.typeutils.CompositeType

      This can be supported by extending the in the org.apache.flink.table.api.TableEnvironment

      getFieldInfo()

      and by constructing the StreamTableSource correspondingly

        Issue Links

          Activity

          Hide
          rtudoran radu added a comment -

          Fabian Hueske sunjincheng shijinkui Stefano Bortoli hongyuhong Timo Walther

          I found that there is a lack of support for registering as table source streams of type row. I believe we should support this to be more generic in the way we create table sources. Potenially we can transmit together with the stream the rowtype by creating a dedicated interface.

          What do you think?

          Show
          rtudoran radu added a comment - Fabian Hueske sunjincheng shijinkui Stefano Bortoli hongyuhong Timo Walther I found that there is a lack of support for registering as table source streams of type row. I believe we should support this to be more generic in the way we create table sources. Potenially we can transmit together with the stream the rowtype by creating a dedicated interface. What do you think?
          Hide
          fhueske Fabian Hueske added a comment -

          Hi radu,

          you are right, it should be possible convert a DataStream<Row> (or DataSet<Row>) into a Table.
          As you noticed, the TableEnvironment.getFieldInfo() lacks support for RowTypeInfo.

          Another issue is that it is not possible to reliably extract the TypeInformation for Row (neither from signatures nor object instances).
          By default, Flink will use a GenericType<Row> for a DataSet<Row> or DataStream<Row>. Since, GenericType<Row> does not contain any information about the fields of a Row is cannot be used to create a Table. (This issue was address in FLINK-6059)
          Therefore, users must manually specify a RowTypeInfo to create a Table.

          The Java API offers the DataStream.returns() method to hint types. In Scala, we use implicit types to override the automatically extracted type:

          val env = StreamExecutionEnvironment.getExecutionEnvironment
          val data = List(Row.of("Hello", "Worlds", Int.box(1)))
          implicit val tpe: TypeInformation[Row] = new RowTypeInfo(Types.STRING, Types.STRING, Types.INT) // tpe is automatically chosen when an implicit value of type TypeInformation[Row] is requested.
          val stream = env.fromCollection(data)
          
          Show
          fhueske Fabian Hueske added a comment - Hi radu , you are right, it should be possible convert a DataStream<Row> (or DataSet<Row>) into a Table. As you noticed, the TableEnvironment.getFieldInfo() lacks support for RowTypeInfo. Another issue is that it is not possible to reliably extract the TypeInformation for Row (neither from signatures nor object instances). By default, Flink will use a GenericType<Row> for a DataSet<Row> or DataStream<Row>. Since, GenericType<Row> does not contain any information about the fields of a Row is cannot be used to create a Table. (This issue was address in FLINK-6059 ) Therefore, users must manually specify a RowTypeInfo to create a Table. The Java API offers the DataStream.returns() method to hint types. In Scala, we use implicit types to override the automatically extracted type: val env = StreamExecutionEnvironment.getExecutionEnvironment val data = List(Row.of( "Hello" , "Worlds" , Int.box(1))) implicit val tpe: TypeInformation[Row] = new RowTypeInfo(Types.STRING, Types.STRING, Types.INT) // tpe is automatically chosen when an implicit value of type TypeInformation[Row] is requested. val stream = env.fromCollection(data)
          Hide
          rtudoran radu added a comment -

          Fabian Hueske

          I actually tried also to set the type of the datastream manually and it still did not work. At least for the java version.

          i had:
          RowTypeInfo typeinfo = new RowTypeInfo(BasicType.Long_TYPE_INFO, BasicType.Long_TYPE_INFO, BasicType.Long_TYPE_INFO)
          DataStream<Row> ds = ... .returns(typeinfo)
          tableEnv.registerDataStream("MyTableRow", ds, "a, b, c ...");

          but got the same error...

          So - i get that you are fine to have this if i implement it?

          Show
          rtudoran radu added a comment - Fabian Hueske I actually tried also to set the type of the datastream manually and it still did not work. At least for the java version. i had: RowTypeInfo typeinfo = new RowTypeInfo(BasicType.Long_TYPE_INFO, BasicType.Long_TYPE_INFO, BasicType.Long_TYPE_INFO) DataStream<Row> ds = ... .returns(typeinfo) tableEnv.registerDataStream("MyTableRow", ds, "a, b, c ..."); but got the same error... So - i get that you are fine to have this if i implement it?
          Hide
          fhueske Fabian Hueske added a comment -

          Yes, it does not work and should be supported.

          I just wanted to point out that it cannot work if the type of the DataStream<Row> / DataSet<Row> is a GenericType<Row>.
          We should support streams / sets with RowTypeInfo.

          Show
          fhueske Fabian Hueske added a comment - Yes, it does not work and should be supported. I just wanted to point out that it cannot work if the type of the DataStream<Row> / DataSet<Row> is a GenericType<Row> . We should support streams / sets with RowTypeInfo .
          Hide
          rtudoran radu added a comment -

          Fabian Hueske Great. I will try to implement this. I will ping you after i have something

          Show
          rtudoran radu added a comment - Fabian Hueske Great. I will try to implement this. I will ping you after i have something
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user rtudoran opened a pull request:

          https://github.com/apache/flink/pull/3845

          FLINK-6476 - Table environment register row data stream

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ x] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [x ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/huawei-flink/flink FLINK-6476Re2

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3845.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3845


          commit 2ae9f3b18836c7836c4506f491301b0807f2c868
          Author: rtudoran <tudoranradu@ymail.com>
          Date: 2017-05-08T14:14:26Z

          Add support for registering as table row streams

          commit 8c0b361660f78129ca569c6a99bb41bada629dc8
          Author: rtudoran <tudoranradu@ymail.com>
          Date: 2017-05-08T15:31:19Z

          Add support for registering streams of Row type as tables


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user rtudoran opened a pull request: https://github.com/apache/flink/pull/3845 FLINK-6476 - Table environment register row data stream Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ x] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [x ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/huawei-flink/flink FLINK-6476 Re2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3845.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3845 commit 2ae9f3b18836c7836c4506f491301b0807f2c868 Author: rtudoran <tudoranradu@ymail.com> Date: 2017-05-08T14:14:26Z Add support for registering as table row streams commit 8c0b361660f78129ca569c6a99bb41bada629dc8 Author: rtudoran <tudoranradu@ymail.com> Date: 2017-05-08T15:31:19Z Add support for registering streams of Row type as tables
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rtudoran commented on the issue:

          https://github.com/apache/flink/pull/3845

          @fhueske

          Please have a look. I inserted tests in scala and java to check the registration of the stream.
          For scala - it seemed to work directly if the implicit value is set (i did not know to test this before opening the JIRA).
          For java - i have added the needed support in TableEnvironment.

          it is not much...i hope i did not miss anything

          Show
          githubbot ASF GitHub Bot added a comment - Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3845 @fhueske Please have a look. I inserted tests in scala and java to check the registration of the stream. For scala - it seemed to work directly if the implicit value is set (i did not know to test this before opening the JIRA). For java - i have added the needed support in TableEnvironment. it is not much...i hope i did not miss anything
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rtudoran commented on the issue:

          https://github.com/apache/flink/pull/3845

          i will close this PR to push one with only one commit

          Show
          githubbot ASF GitHub Bot added a comment - Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3845 i will close this PR to push one with only one commit
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rtudoran closed the pull request at:

          https://github.com/apache/flink/pull/3845

          Show
          githubbot ASF GitHub Bot added a comment - Github user rtudoran closed the pull request at: https://github.com/apache/flink/pull/3845
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user rtudoran opened a pull request:

          https://github.com/apache/flink/pull/3846

          FLINK-6476 - Table environment register row data stream

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [x ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [x ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/huawei-flink/flink FLINK-6476Re2

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3846.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3846


          commit 2ae9f3b18836c7836c4506f491301b0807f2c868
          Author: rtudoran <tudoranradu@ymail.com>
          Date: 2017-05-08T14:14:26Z

          Add support for registering as table row streams

          commit 8c0b361660f78129ca569c6a99bb41bada629dc8
          Author: rtudoran <tudoranradu@ymail.com>
          Date: 2017-05-08T15:31:19Z

          Add support for registering streams of Row type as tables


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user rtudoran opened a pull request: https://github.com/apache/flink/pull/3846 FLINK-6476 - Table environment register row data stream Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [x ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [x ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/huawei-flink/flink FLINK-6476 Re2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3846.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3846 commit 2ae9f3b18836c7836c4506f491301b0807f2c868 Author: rtudoran <tudoranradu@ymail.com> Date: 2017-05-08T14:14:26Z Add support for registering as table row streams commit 8c0b361660f78129ca569c6a99bb41bada629dc8 Author: rtudoran <tudoranradu@ymail.com> Date: 2017-05-08T15:31:19Z Add support for registering streams of Row type as tables
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rtudoran closed the pull request at:

          https://github.com/apache/flink/pull/3846

          Show
          githubbot ASF GitHub Bot added a comment - Github user rtudoran closed the pull request at: https://github.com/apache/flink/pull/3846
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user rtudoran opened a pull request:

          https://github.com/apache/flink/pull/3847

          FLINK-6476 - Table environment register row data stream

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [x ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [x ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/huawei-flink/flink FLINK-6476Re3

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3847.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3847


          commit bfe2882b298d5abd5bbfabbcd79481d2003cba91
          Author: rtudoran <tudoranradu@ymail.com>
          Date: 2017-05-08T16:26:43Z

          Add support for registering streams of Row type as tables


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user rtudoran opened a pull request: https://github.com/apache/flink/pull/3847 FLINK-6476 - Table environment register row data stream Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [x ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [x ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/huawei-flink/flink FLINK-6476 Re3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3847.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3847 commit bfe2882b298d5abd5bbfabbcd79481d2003cba91 Author: rtudoran <tudoranradu@ymail.com> Date: 2017-05-08T16:26:43Z Add support for registering streams of Row type as tables
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rtudoran commented on the issue:

          https://github.com/apache/flink/pull/3847

          @fhueske

          Please have a look. I inserted tests in scala and java to check the registration of the stream.
          For scala - it seemed to work directly if the implicit value is set (i did not know to test this before opening the JIRA).
          For java - i have added the needed support in TableEnvironment.

          it is not much...i hope i did not miss anything

          (I closed the previous PR as by mistake i pushed also an intermediate commit )

          Show
          githubbot ASF GitHub Bot added a comment - Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3847 @fhueske Please have a look. I inserted tests in scala and java to check the registration of the stream. For scala - it seemed to work directly if the implicit value is set (i did not know to test this before opening the JIRA). For java - i have added the needed support in TableEnvironment. it is not much...i hope i did not miss anything (I closed the previous PR as by mistake i pushed also an intermediate commit )
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rtudoran closed the pull request at:

          https://github.com/apache/flink/pull/3847

          Show
          githubbot ASF GitHub Bot added a comment - Github user rtudoran closed the pull request at: https://github.com/apache/flink/pull/3847
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user rtudoran opened a pull request:

          https://github.com/apache/flink/pull/3849

          FLINK-6476 - Table environment register row data stream

          Added tests for registering row streams in java and scala

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [x ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [x ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/huawei-flink/flink FLINK-6476Re4

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3849.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3849


          commit 0bfd5040626dcc2f359902e21b04cd45924d33e1
          Author: rtudoran <tudoranradu@ymail.com>
          Date: 2017-05-08T18:30:57Z

          Add java support to register streams of type row.
          Added tests for registering row streams in java and scala


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user rtudoran opened a pull request: https://github.com/apache/flink/pull/3849 FLINK-6476 - Table environment register row data stream Added tests for registering row streams in java and scala Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [x ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [x ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/huawei-flink/flink FLINK-6476 Re4 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3849.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3849 commit 0bfd5040626dcc2f359902e21b04cd45924d33e1 Author: rtudoran <tudoranradu@ymail.com> Date: 2017-05-08T18:30:57Z Add java support to register streams of type row. Added tests for registering row streams in java and scala
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rtudoran commented on the issue:

          https://github.com/apache/flink/pull/3849

          @fhueske

          Please have a look. I inserted tests in scala and java to check the registration of the stream.
          For scala - it seemed to work directly if the implicit value is set (i did not know to test this before opening the JIRA).
          For java - i have added the needed support in TableEnvironment.

          I realized after the first open/closed PR that i forgot to test the case when row carried also names. Now this test is added and the corresponding support for it

          Show
          githubbot ASF GitHub Bot added a comment - Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3849 @fhueske Please have a look. I inserted tests in scala and java to check the registration of the stream. For scala - it seemed to work directly if the implicit value is set (i did not know to test this before opening the JIRA). For java - i have added the needed support in TableEnvironment. I realized after the first open/closed PR that i forgot to test the case when row carried also names. Now this test is added and the corresponding support for it
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3849#discussion_r115339279

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -677,6 +678,23 @@ abstract class TableEnvironment(val config: TableConfig)

          { case _ => throw new TableException( "Field reference expression or alias on field expression expected.") }

          + case r: RowTypeInfo => {
          + // r.getFieldNames().map(name =>
          — End diff –

          Please remove commented code

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3849#discussion_r115339279 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -677,6 +678,23 @@ abstract class TableEnvironment(val config: TableConfig) { case _ => throw new TableException( "Field reference expression or alias on field expression expected.") } + case r: RowTypeInfo => { + // r.getFieldNames().map(name => — End diff – Please remove commented code
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3849#discussion_r115339883

          — Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java —
          @@ -30,11 +30,125 @@
          import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
          import org.apache.flink.table.api.java.stream.utils.StreamTestData;
          import org.junit.Test;
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;

          import java.util.ArrayList;
          import java.util.List;

          public class SqlITCase extends StreamingMultipleProgramsTestBase {
          +
          + @Test
          + public void testRowRegister() throws Exception {
          — End diff –

          This feature should not be tested with (so many) expensive integration tests.
          Integration tests add a lot to the long build time and should be avoided if the feature can be tested with a more specific unit test. The `TableEnvironmentTest` has already many tests for `getFieldInfo`. We should add tests for `Row` there as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3849#discussion_r115339883 — Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java — @@ -30,11 +30,125 @@ import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.table.api.java.stream.utils.StreamTestData; import org.junit.Test; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import java.util.ArrayList; import java.util.List; public class SqlITCase extends StreamingMultipleProgramsTestBase { + + @Test + public void testRowRegister() throws Exception { — End diff – This feature should not be tested with (so many) expensive integration tests. Integration tests add a lot to the long build time and should be avoided if the feature can be tested with a more specific unit test. The `TableEnvironmentTest` has already many tests for `getFieldInfo`. We should add tests for `Row` there as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3849#discussion_r115339202

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -677,6 +678,23 @@ abstract class TableEnvironment(val config: TableConfig)

          { case _ => throw new TableException( "Field reference expression or alias on field expression expected.") }

          + case r: RowTypeInfo => {
          + // r.getFieldNames().map(name =>
          + // (r.getFieldIndex(name),name))
          + exprs.zipWithIndex flatMap {
          + case (UnresolvedFieldReference(name), idx) =>
          + Some((idx, name))
          + case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
          + val idx = r.getFieldIndex(origName)
          + if (idx < 0)

          { + throw new TableException(s"$origName is not a field of type $r") + }

          + Some((idx, name))
          + case _ => throw new TableException(
          — End diff –

          we need to add the `TimeAttribute` case here as well:
          ```
          case _: TimeAttribute =>
          None
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3849#discussion_r115339202 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -677,6 +678,23 @@ abstract class TableEnvironment(val config: TableConfig) { case _ => throw new TableException( "Field reference expression or alias on field expression expected.") } + case r: RowTypeInfo => { + // r.getFieldNames().map(name => + // (r.getFieldIndex(name),name)) + exprs.zipWithIndex flatMap { + case (UnresolvedFieldReference(name), idx) => + Some((idx, name)) + case (Alias(UnresolvedFieldReference(origName), name, _), _) => + val idx = r.getFieldIndex(origName) + if (idx < 0) { + throw new TableException(s"$origName is not a field of type $r") + } + Some((idx, name)) + case _ => throw new TableException( — End diff – we need to add the `TimeAttribute` case here as well: ``` case _: TimeAttribute => None ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3849#discussion_r115340600

          — Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java —
          @@ -30,11 +30,125 @@
          import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
          import org.apache.flink.table.api.java.stream.utils.StreamTestData;
          import org.junit.Test;
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeinfo.TypeInformation;
          +import org.apache.flink.api.java.typeutils.RowTypeInfo;

          import java.util.ArrayList;
          import java.util.List;

          public class SqlITCase extends StreamingMultipleProgramsTestBase {
          +
          + @Test
          + public void testRowRegister() throws Exception {
          — End diff –

          Adding one integration test for ingesting a `DataStream[Row]` to the scala `SqlITCase` would be good.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3849#discussion_r115340600 — Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java — @@ -30,11 +30,125 @@ import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.table.api.java.stream.utils.StreamTestData; import org.junit.Test; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import java.util.ArrayList; import java.util.List; public class SqlITCase extends StreamingMultipleProgramsTestBase { + + @Test + public void testRowRegister() throws Exception { — End diff – Adding one integration test for ingesting a `DataStream [Row] ` to the scala `SqlITCase` would be good.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rtudoran commented on the issue:

          https://github.com/apache/flink/pull/3849

          @fhueske thanks for the review. I will have make another pass and resubmit it

          Show
          githubbot ASF GitHub Bot added a comment - Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3849 @fhueske thanks for the review. I will have make another pass and resubmit it
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rtudoran commented on the issue:

          https://github.com/apache/flink/pull/3849

          @fhueske
          I resubmitted it - please have a look. I hope it is fine now

          Show
          githubbot ASF GitHub Bot added a comment - Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3849 @fhueske I resubmitted it - please have a look. I hope it is fine now
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3849

          Thanks for the update @rtudoran.
          +1 to merge

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3849 Thanks for the update @rtudoran. +1 to merge
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3849

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3849
          Hide
          fhueske Fabian Hueske added a comment -

          Implemented for 1.3 with 28a89d1cac79063245bbc1ad9d262e3bc94b17b9
          Implemented for 1.4 with 6cd98a9b6f223bc665831e4ff3626c98aed9b272

          Show
          fhueske Fabian Hueske added a comment - Implemented for 1.3 with 28a89d1cac79063245bbc1ad9d262e3bc94b17b9 Implemented for 1.4 with 6cd98a9b6f223bc665831e4ff3626c98aed9b272

            People

            • Assignee:
              rtudoran radu
              Reporter:
              rtudoran radu
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development