Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Neither of the following snippets works:

      public static void main(String[] args) throws Exception {
      	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
      
      	DataSet<String> text = ...
      	BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
      
      // 	Table t = tEnv.fromDataSet(text, "text").select("text AS line");
      // 	Table t = tEnv.fromDataSet(text, "text").as("line");
      	Table t = tEnv.fromDataSet(text, "text").select("text AS line").select("line AS line");
      
      	tEnv.toDataSet(t, MyPojo.class).print();
      }
      
      public static class MyPojo {
      	public String line;
      }
      
      Exception in thread "main" org.apache.flink.table.api.TableException: POJO does not define field name: text
      	at org.apache.flink.table.typeutils.TypeConverter$$anonfun$2.apply(TypeConverter.scala:85)
      	at org.apache.flink.table.typeutils.TypeConverter$$anonfun$2.apply(TypeConverter.scala:81)
      	at scala.collection.immutable.List.foreach(List.scala:318)
      	at org.apache.flink.table.typeutils.TypeConverter$.determineReturnType(TypeConverter.scala:81)
      	at org.apache.flink.table.plan.nodes.dataset.BatchScan.convertToExpectedType(BatchScan.scala:69)
      	at org.apache.flink.table.plan.nodes.dataset.DataSetScan.translateToPlan(DataSetScan.scala:61)
      	at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:305)
      	at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:289)
      	at org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146)
      	at groupId.WordCount.main(WordCount.java:67)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
      

        Issue Links

          Activity

          Hide
          jark Jark Wu added a comment -

          I have looked into this issue. It only occurred when applying a renaming project and the expected type is a POJO.

          The reason is that the `ProjectRemoveRule.INSTANCE` rule removes identity project so that the renaming project doesn't exist at last. Not sure how to fix this. We can't remove `ProjectRemoveRule.INSTANCE` rule.

          Show
          jark Jark Wu added a comment - I have looked into this issue. It only occurred when applying a renaming project and the expected type is a POJO. The reason is that the `ProjectRemoveRule.INSTANCE` rule removes identity project so that the renaming project doesn't exist at last. Not sure how to fix this. We can't remove `ProjectRemoveRule.INSTANCE` rule.
          Hide
          twalthr Timo Walther added a comment -

          You are right. This issue is not trivial. Actually we need a `ProjectRemoveRule` that pushes the field names in the input node, such that the field names are not getting lost. Fabian Hueske and I already discussed about having an easier architecture with conversion mappers at the beginning and end, and only operators handling row types in the middle. Maybe it is time for that now. We could then have a additional rule that merges two conversion mappers if nothing is in between.

          Table t = tEnv.fromDataSet(text, "text").groupBy("text").select("text AS line"); even fails with a CodeGen exception.

          Show
          twalthr Timo Walther added a comment - You are right. This issue is not trivial. Actually we need a `ProjectRemoveRule` that pushes the field names in the input node, such that the field names are not getting lost. Fabian Hueske and I already discussed about having an easier architecture with conversion mappers at the beginning and end, and only operators handling row types in the middle. Maybe it is time for that now. We could then have a additional rule that merges two conversion mappers if nothing is in between. Table t = tEnv.fromDataSet(text, "text").groupBy("text").select("text AS line"); even fails with a CodeGen exception.
          Hide
          twalthr Timo Walther added a comment -

          I will assign this issue to me.

          Show
          twalthr Timo Walther added a comment - I will assign this issue to me.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user twalthr opened a pull request:

          https://github.com/apache/flink/pull/3277

          FLINK-5662 [table] Rework internal type handling of Table API

          This PR refactors the internal type handling of the Table API. It...

          • converts type to internal Row type immediately in Batch/StreamScan
          • uses only Row type within the operators
          • converts to target type at the very end in Batch/StreamTableEnvironment
          • removes interfaces for `expectedType`
          • removes the config parameter `efficient type usage`
          • fixes the generic types (now `Any` means "any type" and `Row` means a "row type")
          • fixes IDE warnings and unused imports where I found them
          • fixes the "Alias in front of output fails" issue

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/twalthr/flink FLINK-5662

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3277.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3277


          commit fdca898407a6e70800ff706c37ed58ea4244a9b0
          Author: twalthr <twalthr@apache.org>
          Date: 2017-02-06T16:18:08Z

          FLINK-5662 [table] Rework internal type handling of Table API


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/3277 FLINK-5662 [table] Rework internal type handling of Table API This PR refactors the internal type handling of the Table API. It... converts type to internal Row type immediately in Batch/StreamScan uses only Row type within the operators converts to target type at the very end in Batch/StreamTableEnvironment removes interfaces for `expectedType` removes the config parameter `efficient type usage` fixes the generic types (now `Any` means "any type" and `Row` means a "row type") fixes IDE warnings and unused imports where I found them fixes the "Alias in front of output fails" issue You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-5662 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3277.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3277 commit fdca898407a6e70800ff706c37ed58ea4244a9b0 Author: twalthr <twalthr@apache.org> Date: 2017-02-06T16:18:08Z FLINK-5662 [table] Rework internal type handling of Table API
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3277

          I forgot to test TableSinks. I will do this tomorrow.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3277 I forgot to test TableSinks. I will do this tomorrow.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3277

          Ready to be reviewed

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3277 Ready to be reviewed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100269744

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala —
          @@ -170,7 +171,7 @@ abstract class BatchTableEnvironment(
          private[flink] def explain(table: Table, extended: Boolean): String = {
          val ast = table.getRelNode
          val optimizedPlan = optimize(ast)

          • val dataSet = translate[Row](optimizedPlan) (TypeExtractor.createTypeInfo(classOf[Row]))
            + val dataSet = translate[Row](optimizedPlan, ast.getRowType) (new RowTypeInfo())
              • End diff –

          Why we should pass in the row type from ast but not from the optimizedPlan. I think optimize does not change the row type of the whole plan.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100269744 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala — @@ -170,7 +171,7 @@ abstract class BatchTableEnvironment( private [flink] def explain(table: Table, extended: Boolean): String = { val ast = table.getRelNode val optimizedPlan = optimize(ast) val dataSet = translate [Row] (optimizedPlan) (TypeExtractor.createTypeInfo(classOf [Row] )) + val dataSet = translate [Row] (optimizedPlan, ast.getRowType) (new RowTypeInfo()) End diff – Why we should pass in the row type from ast but not from the optimizedPlan. I think optimize does not change the row type of the whole plan.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100270246

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala —
          @@ -285,28 +286,37 @@ abstract class BatchTableEnvironment(

          • @return The [[DataSet]] that corresponds to the translated [[Table]].
            */
            protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = { - val dataSetPlan = optimize(table.getRelNode) - translate(dataSetPlan) + val relNode = table.getRelNode + val dataSetPlan = optimize(relNode) + translate(dataSetPlan, relNode.getRowType) }

          /**

          • * Translates a logical [[RelNode]] into a [[DataSet]].
            + * Translates a logical [[RelNode]] into a [[DataSet]]. Converts to target type if necessary.
            *
          • @param logicalPlan The root node of the relational expression tree.
          • @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
          • @tparam A The type of the resulting [[DataSet]].
          • @return The [[DataSet]] that corresponds to the translated [[Table]].
            */
          • protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = {
            + protected def translate[A](
            + logicalPlan: RelNode,
            + logicalType: RelDataType)
              • End diff –

          Can logicalType be directly get from logicalPlan (It's related to my comment with "explain" method)

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100270246 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala — @@ -285,28 +286,37 @@ abstract class BatchTableEnvironment( @return The [ [DataSet] ] that corresponds to the translated [ [Table] ]. */ protected def translate [A] (table: Table)(implicit tpe: TypeInformation [A] ): DataSet [A] = { - val dataSetPlan = optimize(table.getRelNode) - translate(dataSetPlan) + val relNode = table.getRelNode + val dataSetPlan = optimize(relNode) + translate(dataSetPlan, relNode.getRowType) } /** * Translates a logical [ [RelNode] ] into a [ [DataSet] ]. + * Translates a logical [ [RelNode] ] into a [ [DataSet] ]. Converts to target type if necessary. * @param logicalPlan The root node of the relational expression tree. @param tpe The [ [TypeInformation] ] of the resulting [ [DataSet] ]. @tparam A The type of the resulting [ [DataSet] ]. @return The [ [DataSet] ] that corresponds to the translated [ [Table] ]. */ protected def translate [A] (logicalPlan: RelNode)(implicit tpe: TypeInformation [A] ): DataSet [A] = { + protected def translate [A] ( + logicalPlan: RelNode, + logicalType: RelDataType) End diff – Can logicalType be directly get from logicalPlan (It's related to my comment with "explain" method)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100270646

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala —
          @@ -285,28 +286,37 @@ abstract class BatchTableEnvironment(

          • @return The [[DataSet]] that corresponds to the translated [[Table]].
            */
            protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = { - val dataSetPlan = optimize(table.getRelNode) - translate(dataSetPlan) + val relNode = table.getRelNode + val dataSetPlan = optimize(relNode) + translate(dataSetPlan, relNode.getRowType) }

          /**

          • * Translates a logical [[RelNode]] into a [[DataSet]].
            + * Translates a logical [[RelNode]] into a [[DataSet]]. Converts to target type if necessary.
            *
          • @param logicalPlan The root node of the relational expression tree.
          • @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
          • @tparam A The type of the resulting [[DataSet]].
          • @return The [[DataSet]] that corresponds to the translated [[Table]].
            */
          • protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = {
            + protected def translate[A](
            + logicalPlan: RelNode,
            + logicalType: RelDataType)
              • End diff –

          And if it is indeed needed, we should update method comment as well

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100270646 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala — @@ -285,28 +286,37 @@ abstract class BatchTableEnvironment( @return The [ [DataSet] ] that corresponds to the translated [ [Table] ]. */ protected def translate [A] (table: Table)(implicit tpe: TypeInformation [A] ): DataSet [A] = { - val dataSetPlan = optimize(table.getRelNode) - translate(dataSetPlan) + val relNode = table.getRelNode + val dataSetPlan = optimize(relNode) + translate(dataSetPlan, relNode.getRowType) } /** * Translates a logical [ [RelNode] ] into a [ [DataSet] ]. + * Translates a logical [ [RelNode] ] into a [ [DataSet] ]. Converts to target type if necessary. * @param logicalPlan The root node of the relational expression tree. @param tpe The [ [TypeInformation] ] of the resulting [ [DataSet] ]. @tparam A The type of the resulting [ [DataSet] ]. @return The [ [DataSet] ] that corresponds to the translated [ [Table] ]. */ protected def translate [A] (logicalPlan: RelNode)(implicit tpe: TypeInformation [A] ): DataSet [A] = { + protected def translate [A] ( + logicalPlan: RelNode, + logicalType: RelDataType) End diff – And if it is indeed needed, we should update method comment as well
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100271473

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala —
          @@ -305,18 +307,25 @@ abstract class StreamTableEnvironment(

          • @tparam A The type of the resulting [[DataStream]].
          • @return The [[DataStream]] that corresponds to the translated [[Table]].
            */
          • protected def translate[A]
          • (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = {
            + protected def translate[A](
            + logicalPlan: RelNode,
            + logicalType: RelDataType)
              • End diff –

          Same comments with BatchTableEnvironment

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100271473 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala — @@ -305,18 +307,25 @@ abstract class StreamTableEnvironment( @tparam A The type of the resulting [ [DataStream] ]. @return The [ [DataStream] ] that corresponds to the translated [ [Table] ]. */ protected def translate [A] (logicalPlan: RelNode)(implicit tpe: TypeInformation [A] ): DataStream [A] = { + protected def translate [A] ( + logicalPlan: RelNode, + logicalType: RelDataType) End diff – Same comments with BatchTableEnvironment
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100296701

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala —
          @@ -19,26 +19,19 @@
          package org.apache.flink.table.plan.nodes.dataset

          import org.apache.calcite.rel.RelNode
          +import org.apache.flink.api.java.DataSet
          import org.apache.flink.table.api.BatchTableEnvironment
          import org.apache.flink.table.plan.nodes.FlinkRel
          -import org.apache.flink.api.common.typeinfo.TypeInformation
          -import org.apache.flink.api.java.DataSet
          +import org.apache.flink.types.Row

          trait DataSetRel extends RelNode with FlinkRel {

          /**

          • Translates the [[DataSetRel]] node into a [[DataSet]] operator.
            *
          • * @param tableEnv [[BatchTableEnvironment]] of the translated Table.
          • * @param expectedType specifies the type the Flink operator should return. The type must
          • * have the same arity as the result. For instance, if the
          • * expected type is a RowTypeInfo this method will return a DataSet of
          • * type Row. If the expected type is Tuple2, the operator will return
          • * a Tuple2 if possible. Row otherwise.
            + * @param tableEnv The [[BatchTableEnvironment]] of the translated Table.
          • @return DataSet of type expectedType or RowTypeInfo
              • End diff –

          DataSet of Row is enough

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100296701 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala — @@ -19,26 +19,19 @@ package org.apache.flink.table.plan.nodes.dataset import org.apache.calcite.rel.RelNode +import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.table.plan.nodes.FlinkRel -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.DataSet +import org.apache.flink.types.Row trait DataSetRel extends RelNode with FlinkRel { /** Translates the [ [DataSetRel] ] node into a [ [DataSet] ] operator. * * @param tableEnv [ [BatchTableEnvironment] ] of the translated Table. * @param expectedType specifies the type the Flink operator should return. The type must * have the same arity as the result. For instance, if the * expected type is a RowTypeInfo this method will return a DataSet of * type Row. If the expected type is Tuple2, the operator will return * a Tuple2 if possible. Row otherwise. + * @param tableEnv The [ [BatchTableEnvironment] ] of the translated Table. @return DataSet of type expectedType or RowTypeInfo End diff – DataSet of Row is enough
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100297140

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala —
          @@ -19,27 +19,20 @@
          package org.apache.flink.table.plan.nodes.datastream

          import org.apache.calcite.rel.RelNode
          -import org.apache.flink.api.common.typeinfo.TypeInformation
          -import org.apache.flink.table.plan.nodes.FlinkRel
          import org.apache.flink.streaming.api.datastream.DataStream
          import org.apache.flink.table.api.StreamTableEnvironment
          +import org.apache.flink.table.plan.nodes.FlinkRel
          +import org.apache.flink.types.Row

          trait DataStreamRel extends RelNode with FlinkRel {

          /**

          • Translates the FlinkRelNode into a Flink operator.
            *
          • @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
          • * @param expectedType specifies the type the Flink operator should return. The type must
          • * have the same arity as the result. For instance, if the
          • * expected type is a RowTypeInfo this method will return a DataSet of
          • * type Row. If the expected type is Tuple2, the operator will return
          • * a Tuple2 if possible. Row otherwise.
          • @return DataStream of type expectedType or RowTypeInfo
              • End diff –

          DataStream of Row

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100297140 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala — @@ -19,27 +19,20 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.rel.RelNode -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.plan.nodes.FlinkRel import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.plan.nodes.FlinkRel +import org.apache.flink.types.Row trait DataStreamRel extends RelNode with FlinkRel { /** Translates the FlinkRelNode into a Flink operator. * @param tableEnv The [ [StreamTableEnvironment] ] of the translated Table. * @param expectedType specifies the type the Flink operator should return. The type must * have the same arity as the result. For instance, if the * expected type is a RowTypeInfo this method will return a DataSet of * type Row. If the expected type is Tuple2, the operator will return * a Tuple2 if possible. Row otherwise. @return DataStream of type expectedType or RowTypeInfo End diff – DataStream of Row
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3277

          Hi @twalthr, the PR looks quite good! +1 to what @KurtYoung said. I have a few minor comments which I will add soon.

          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3277 Hi @twalthr, the PR looks quite good! +1 to what @KurtYoung said. I have a few minor comments which I will add soon. Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100708770

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala —
          @@ -0,0 +1,98 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.plan.nodes
          +
          +import org.apache.flink.api.common.functions.MapFunction
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.table.api.TableConfig
          +import org.apache.flink.table.codegen.CodeGenerator
          +import org.apache.flink.table.runtime.MapRunner
          +import org.apache.flink.types.Row
          +
          +/**
          + * Common class for batch and stream scans.
          + */
          +trait CommonScan {
          +
          + /**
          + * We check if the input type is exactly the same as the internal row type.
          + * A conversion is necessary if types differ or object have to be unboxed
          + * (i.e. Date, Time, Timestamp need to be converted into their primitive equivalents).
          + */
          + private[flink] def needsConversion(
          + externalTypeInfo: TypeInformation[Any],
          + internalTypeInfo: TypeInformation[Row])
          + : Boolean = {
          +
          + if (externalTypeInfo == internalTypeInfo) {
          + val rowTypeInfo = externalTypeInfo.asInstanceOf[RowTypeInfo]
          + var containsBoxedTypes = false
          + // TODO enable these lines for FLINK-5429
          + // for (i <- rowTypeInfo.getArity) {
          + // val field = rowTypeInfo.getTypeAt
          + // if (field == SqlTimeTypeInfo.DATE ||
          + // field == SqlTimeTypeInfo.TIME ||
          + // field == SqlTimeTypeInfo.TIMESTAMP)

          { + // containsBoxedTypes = true + // }

          + // }
          + containsBoxedTypes
          + } else

          { + true + }

          + }
          +
          + private[flink] def getConversionMapper(
          + config: TableConfig,
          + inputType: TypeInformation[Any],
          + expectedType: TypeInformation[Row],
          + conversionOperatorName: String,
          + fieldNames: Seq[String],
          + inputPojoFieldMapping: Option[Array[Int]] = None)
          + : MapFunction[Any, Row] = {
          +
          + val generator = new CodeGenerator(
          + config,
          + false,
          + inputType,
          + None,
          + inputPojoFieldMapping)
          + val conversion = generator.generateConverterResultExpression(expectedType, fieldNames)
          +
          + val body =
          + s"""
          + |$

          {conversion.code}

          + |return $

          {conversion.resultTerm}

          ;
          + |""".stripMargin
          +
          + val genFunction = generator.generateFunction(
          + conversionOperatorName,
          + classOf[MapFunction[Row, Row]],
          — End diff –

          Shouldn't the input type of the generated `MapFunction` be `Any` as like the input type of the wrapping `MapRunner`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100708770 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala — @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.runtime.MapRunner +import org.apache.flink.types.Row + +/** + * Common class for batch and stream scans. + */ +trait CommonScan { + + /** + * We check if the input type is exactly the same as the internal row type. + * A conversion is necessary if types differ or object have to be unboxed + * (i.e. Date, Time, Timestamp need to be converted into their primitive equivalents). + */ + private [flink] def needsConversion( + externalTypeInfo: TypeInformation [Any] , + internalTypeInfo: TypeInformation [Row] ) + : Boolean = { + + if (externalTypeInfo == internalTypeInfo) { + val rowTypeInfo = externalTypeInfo.asInstanceOf [RowTypeInfo] + var containsBoxedTypes = false + // TODO enable these lines for FLINK-5429 + // for (i <- rowTypeInfo.getArity) { + // val field = rowTypeInfo.getTypeAt + // if (field == SqlTimeTypeInfo.DATE || + // field == SqlTimeTypeInfo.TIME || + // field == SqlTimeTypeInfo.TIMESTAMP) { + // containsBoxedTypes = true + // } + // } + containsBoxedTypes + } else { + true + } + } + + private [flink] def getConversionMapper( + config: TableConfig, + inputType: TypeInformation [Any] , + expectedType: TypeInformation [Row] , + conversionOperatorName: String, + fieldNames: Seq [String] , + inputPojoFieldMapping: Option[Array [Int] ] = None) + : MapFunction [Any, Row] = { + + val generator = new CodeGenerator( + config, + false, + inputType, + None, + inputPojoFieldMapping) + val conversion = generator.generateConverterResultExpression(expectedType, fieldNames) + + val body = + s""" + |$ {conversion.code} + |return $ {conversion.resultTerm} ; + |""".stripMargin + + val genFunction = generator.generateFunction( + conversionOperatorName, + classOf[MapFunction [Row, Row] ], — End diff – Shouldn't the input type of the generated `MapFunction` be `Any` as like the input type of the wrapping `MapRunner`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100708516

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -428,6 +431,113 @@ abstract class TableEnvironment(val config: TableConfig)

          { (fieldNames.toArray, fieldIndexes.toArray) }

          + /**
          + * Creates a final converter that maps the internal row type to external type.
          + */
          + protected def sinkConversion[T](
          + physicalRowTypeInfo: TypeInformation[Row],
          + logicalRowType: RelDataType,
          + expectedTypeInfo: TypeInformation[T],
          + functionName: String)
          + : Option[MapFunction[Row, T]] = {
          +
          + // validate that at least the field types of physical and logical type match
          + // we do that here to make sure that plan translation was correct
          + val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType)
          + if (physicalRowTypeInfo != logicalRowTypeInfo)

          { + throw TableException("The field types of physical and logical row types do not match." + + "This is a bug and should not happen. Please file an issue.") + }

          +
          + // expected type is a row, no conversion needed
          + // TODO this logic will change with FLINK-5429
          + if (expectedTypeInfo.getTypeClass == classOf[Row])

          { + return None + }

          +
          + // convert to type information
          + val logicalFieldTypes = logicalRowType.getFieldList.asScala map

          { relDataType => + FlinkTypeFactory.toTypeInfo(relDataType.getType) + }

          + // field names
          + val logicalFieldNames = logicalRowType.getFieldNames.asScala
          +
          + // validate expected type
          + if (expectedTypeInfo.getArity != logicalFieldTypes.length)

          { + throw new TableException("Arity of result does not match expected type.") + }

          + expectedTypeInfo match {
          +
          + // POJO type expected
          + case pt: PojoTypeInfo[_] =>
          + logicalFieldNames.zip(logicalFieldTypes) foreach {
          + case (fName, fType) =>
          + val pojoIdx = pt.getFieldIndex(fName)
          + if (pojoIdx < 0)

          { + throw new TableException(s"POJO does not define field name: $fName") + }

          + val expectedTypeInfo = pt.getTypeAt(pojoIdx)
          + if (fType != expectedTypeInfo)

          { + throw new TableException(s"Result field does not match expected type. " + + s"Expected: $expectedTypeInfo; Actual: $fType") + }

          + }
          +
          + // Tuple/Case class type expected
          + case ct: CompositeType[_] =>
          + logicalFieldTypes.zipWithIndex foreach {
          + case (fieldTypeInfo, i) =>
          + val expectedTypeInfo = ct.getTypeAt
          + if (fieldTypeInfo != expectedTypeInfo)

          { + throw new TableException(s"Result field does not match expected type. " + + s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo") + }

          + }
          +
          + // Atomic type expected
          + case at: AtomicType[_] =>
          + val fieldTypeInfo = logicalFieldTypes.head
          — End diff –

          Add a check that `logicalFieldTypes.size() == 1`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100708516 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -428,6 +431,113 @@ abstract class TableEnvironment(val config: TableConfig) { (fieldNames.toArray, fieldIndexes.toArray) } + /** + * Creates a final converter that maps the internal row type to external type. + */ + protected def sinkConversion [T] ( + physicalRowTypeInfo: TypeInformation [Row] , + logicalRowType: RelDataType, + expectedTypeInfo: TypeInformation [T] , + functionName: String) + : Option[MapFunction [Row, T] ] = { + + // validate that at least the field types of physical and logical type match + // we do that here to make sure that plan translation was correct + val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType) + if (physicalRowTypeInfo != logicalRowTypeInfo) { + throw TableException("The field types of physical and logical row types do not match." + + "This is a bug and should not happen. Please file an issue.") + } + + // expected type is a row, no conversion needed + // TODO this logic will change with FLINK-5429 + if (expectedTypeInfo.getTypeClass == classOf [Row] ) { + return None + } + + // convert to type information + val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType => + FlinkTypeFactory.toTypeInfo(relDataType.getType) + } + // field names + val logicalFieldNames = logicalRowType.getFieldNames.asScala + + // validate expected type + if (expectedTypeInfo.getArity != logicalFieldTypes.length) { + throw new TableException("Arity of result does not match expected type.") + } + expectedTypeInfo match { + + // POJO type expected + case pt: PojoTypeInfo [_] => + logicalFieldNames.zip(logicalFieldTypes) foreach { + case (fName, fType) => + val pojoIdx = pt.getFieldIndex(fName) + if (pojoIdx < 0) { + throw new TableException(s"POJO does not define field name: $fName") + } + val expectedTypeInfo = pt.getTypeAt(pojoIdx) + if (fType != expectedTypeInfo) { + throw new TableException(s"Result field does not match expected type. " + + s"Expected: $expectedTypeInfo; Actual: $fType") + } + } + + // Tuple/Case class type expected + case ct: CompositeType [_] => + logicalFieldTypes.zipWithIndex foreach { + case (fieldTypeInfo, i) => + val expectedTypeInfo = ct.getTypeAt + if (fieldTypeInfo != expectedTypeInfo) { + throw new TableException(s"Result field does not match expected type. " + + s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo") + } + } + + // Atomic type expected + case at: AtomicType [_] => + val fieldTypeInfo = logicalFieldTypes.head — End diff – Add a check that `logicalFieldTypes.size() == 1`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100708670

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala —
          @@ -0,0 +1,98 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.plan.nodes
          +
          +import org.apache.flink.api.common.functions.MapFunction
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.table.api.TableConfig
          +import org.apache.flink.table.codegen.CodeGenerator
          +import org.apache.flink.table.runtime.MapRunner
          +import org.apache.flink.types.Row
          +
          +/**
          + * Common class for batch and stream scans.
          + */
          +trait CommonScan {
          +
          + /**
          + * We check if the input type is exactly the same as the internal row type.
          + * A conversion is necessary if types differ or object have to be unboxed
          + * (i.e. Date, Time, Timestamp need to be converted into their primitive equivalents).
          + */
          + private[flink] def needsConversion(
          + externalTypeInfo: TypeInformation[Any],
          + internalTypeInfo: TypeInformation[Row])
          + : Boolean = {
          +
          + if (externalTypeInfo == internalTypeInfo) {
          + val rowTypeInfo = externalTypeInfo.asInstanceOf[RowTypeInfo]
          + var containsBoxedTypes = false
          + // TODO enable these lines for FLINK-5429
          — End diff –

          We should either address FLINK-5429 in this PR or remove the related code.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100708670 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala — @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.runtime.MapRunner +import org.apache.flink.types.Row + +/** + * Common class for batch and stream scans. + */ +trait CommonScan { + + /** + * We check if the input type is exactly the same as the internal row type. + * A conversion is necessary if types differ or object have to be unboxed + * (i.e. Date, Time, Timestamp need to be converted into their primitive equivalents). + */ + private [flink] def needsConversion( + externalTypeInfo: TypeInformation [Any] , + internalTypeInfo: TypeInformation [Row] ) + : Boolean = { + + if (externalTypeInfo == internalTypeInfo) { + val rowTypeInfo = externalTypeInfo.asInstanceOf [RowTypeInfo] + var containsBoxedTypes = false + // TODO enable these lines for FLINK-5429 — End diff – We should either address FLINK-5429 in this PR or remove the related code.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100709080

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -428,6 +431,113 @@ abstract class TableEnvironment(val config: TableConfig)

          { (fieldNames.toArray, fieldIndexes.toArray) }

          + /**
          + * Creates a final converter that maps the internal row type to external type.
          + */
          + protected def sinkConversion[T](
          + physicalRowTypeInfo: TypeInformation[Row],
          — End diff –

          `physicalRowTypeInfo` is not really needed here, right? It can be generated from the `logicalRowType`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100709080 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -428,6 +431,113 @@ abstract class TableEnvironment(val config: TableConfig) { (fieldNames.toArray, fieldIndexes.toArray) } + /** + * Creates a final converter that maps the internal row type to external type. + */ + protected def sinkConversion [T] ( + physicalRowTypeInfo: TypeInformation [Row] , — End diff – `physicalRowTypeInfo` is not really needed here, right? It can be generated from the `logicalRowType`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100708947

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala —
          @@ -0,0 +1,98 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.plan.nodes
          +
          +import org.apache.flink.api.common.functions.MapFunction
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.table.api.TableConfig
          +import org.apache.flink.table.codegen.CodeGenerator
          +import org.apache.flink.table.runtime.MapRunner
          +import org.apache.flink.types.Row
          +
          +/**
          + * Common class for batch and stream scans.
          + */
          +trait CommonScan {
          +
          + /**
          + * We check if the input type is exactly the same as the internal row type.
          + * A conversion is necessary if types differ or object have to be unboxed
          + * (i.e. Date, Time, Timestamp need to be converted into their primitive equivalents).
          + */
          + private[flink] def needsConversion(
          + externalTypeInfo: TypeInformation[Any],
          + internalTypeInfo: TypeInformation[Row])
          + : Boolean = {
          +
          + if (externalTypeInfo == internalTypeInfo) {
          + val rowTypeInfo = externalTypeInfo.asInstanceOf[RowTypeInfo]
          + var containsBoxedTypes = false
          + // TODO enable these lines for FLINK-5429
          — End diff –

          Not sure if this would work as expected. Shouldn't we rather change `FlinkTypeFactory.toInternalRowType` to use `long`/`int` for date and time fields? If the types are changed, the internalTypeInfo (which is expected to be equal to the externalTypeInfo) would need to change too.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100708947 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala — @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.runtime.MapRunner +import org.apache.flink.types.Row + +/** + * Common class for batch and stream scans. + */ +trait CommonScan { + + /** + * We check if the input type is exactly the same as the internal row type. + * A conversion is necessary if types differ or object have to be unboxed + * (i.e. Date, Time, Timestamp need to be converted into their primitive equivalents). + */ + private [flink] def needsConversion( + externalTypeInfo: TypeInformation [Any] , + internalTypeInfo: TypeInformation [Row] ) + : Boolean = { + + if (externalTypeInfo == internalTypeInfo) { + val rowTypeInfo = externalTypeInfo.asInstanceOf [RowTypeInfo] + var containsBoxedTypes = false + // TODO enable these lines for FLINK-5429 — End diff – Not sure if this would work as expected. Shouldn't we rather change `FlinkTypeFactory.toInternalRowType` to use `long`/`int` for date and time fields? If the types are changed, the internalTypeInfo (which is expected to be equal to the externalTypeInfo) would need to change too.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100708467

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -428,6 +431,113 @@ abstract class TableEnvironment(val config: TableConfig)

          { (fieldNames.toArray, fieldIndexes.toArray) }

          + /**
          + * Creates a final converter that maps the internal row type to external type.
          + */
          + protected def sinkConversion[T](
          + physicalRowTypeInfo: TypeInformation[Row],
          + logicalRowType: RelDataType,
          + expectedTypeInfo: TypeInformation[T],
          + functionName: String)
          + : Option[MapFunction[Row, T]] = {
          +
          + // validate that at least the field types of physical and logical type match
          + // we do that here to make sure that plan translation was correct
          + val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType)
          + if (physicalRowTypeInfo != logicalRowTypeInfo)

          { + throw TableException("The field types of physical and logical row types do not match." + + "This is a bug and should not happen. Please file an issue.") + }

          +
          + // expected type is a row, no conversion needed
          + // TODO this logic will change with FLINK-5429
          + if (expectedTypeInfo.getTypeClass == classOf[Row])

          { + return None + }

          +
          + // convert to type information
          + val logicalFieldTypes = logicalRowType.getFieldList.asScala map

          { relDataType => + FlinkTypeFactory.toTypeInfo(relDataType.getType) + }

          + // field names
          + val logicalFieldNames = logicalRowType.getFieldNames.asScala
          +
          + // validate expected type
          + if (expectedTypeInfo.getArity != logicalFieldTypes.length)

          { + throw new TableException("Arity of result does not match expected type.") + }

          + expectedTypeInfo match {
          +
          + // POJO type expected
          + case pt: PojoTypeInfo[_] =>
          + logicalFieldNames.zip(logicalFieldTypes) foreach {
          + case (fName, fType) =>
          + val pojoIdx = pt.getFieldIndex(fName)
          + if (pojoIdx < 0)

          { + throw new TableException(s"POJO does not define field name: $fName") + }

          + val expectedTypeInfo = pt.getTypeAt(pojoIdx)
          + if (fType != expectedTypeInfo)

          { + throw new TableException(s"Result field does not match expected type. " + + s"Expected: $expectedTypeInfo; Actual: $fType") + }

          + }
          +
          + // Tuple/Case class type expected
          + case ct: CompositeType[_] =>
          — End diff –

          `CompositeType` -> `TupleTypeInfoBase`. Covers tuples and case classes and Row.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100708467 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -428,6 +431,113 @@ abstract class TableEnvironment(val config: TableConfig) { (fieldNames.toArray, fieldIndexes.toArray) } + /** + * Creates a final converter that maps the internal row type to external type. + */ + protected def sinkConversion [T] ( + physicalRowTypeInfo: TypeInformation [Row] , + logicalRowType: RelDataType, + expectedTypeInfo: TypeInformation [T] , + functionName: String) + : Option[MapFunction [Row, T] ] = { + + // validate that at least the field types of physical and logical type match + // we do that here to make sure that plan translation was correct + val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType) + if (physicalRowTypeInfo != logicalRowTypeInfo) { + throw TableException("The field types of physical and logical row types do not match." + + "This is a bug and should not happen. Please file an issue.") + } + + // expected type is a row, no conversion needed + // TODO this logic will change with FLINK-5429 + if (expectedTypeInfo.getTypeClass == classOf [Row] ) { + return None + } + + // convert to type information + val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType => + FlinkTypeFactory.toTypeInfo(relDataType.getType) + } + // field names + val logicalFieldNames = logicalRowType.getFieldNames.asScala + + // validate expected type + if (expectedTypeInfo.getArity != logicalFieldTypes.length) { + throw new TableException("Arity of result does not match expected type.") + } + expectedTypeInfo match { + + // POJO type expected + case pt: PojoTypeInfo [_] => + logicalFieldNames.zip(logicalFieldTypes) foreach { + case (fName, fType) => + val pojoIdx = pt.getFieldIndex(fName) + if (pojoIdx < 0) { + throw new TableException(s"POJO does not define field name: $fName") + } + val expectedTypeInfo = pt.getTypeAt(pojoIdx) + if (fType != expectedTypeInfo) { + throw new TableException(s"Result field does not match expected type. " + + s"Expected: $expectedTypeInfo; Actual: $fType") + } + } + + // Tuple/Case class type expected + case ct: CompositeType [_] => — End diff – `CompositeType` -> `TupleTypeInfoBase`. Covers tuples and case classes and Row.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100749561

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala —
          @@ -170,7 +171,7 @@ abstract class BatchTableEnvironment(
          private[flink] def explain(table: Table, extended: Boolean): String = {
          val ast = table.getRelNode
          val optimizedPlan = optimize(ast)

          • val dataSet = translate[Row](optimizedPlan) (TypeExtractor.createTypeInfo(classOf[Row]))
            + val dataSet = translate[Row](optimizedPlan, ast.getRowType) (new RowTypeInfo())
              • End diff –

          This is the problem described in the issue description of FLINK-5662. The optimize does not change the types of the fields but their names. This might be a Calcite bug but until then we need the logical type for the sink conversion.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100749561 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala — @@ -170,7 +171,7 @@ abstract class BatchTableEnvironment( private [flink] def explain(table: Table, extended: Boolean): String = { val ast = table.getRelNode val optimizedPlan = optimize(ast) val dataSet = translate [Row] (optimizedPlan) (TypeExtractor.createTypeInfo(classOf [Row] )) + val dataSet = translate [Row] (optimizedPlan, ast.getRowType) (new RowTypeInfo()) End diff – This is the problem described in the issue description of FLINK-5662 . The optimize does not change the types of the fields but their names. This might be a Calcite bug but until then we need the logical type for the sink conversion.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100749841

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala —
          @@ -285,28 +286,37 @@ abstract class BatchTableEnvironment(

          • @return The [[DataSet]] that corresponds to the translated [[Table]].
            */
            protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = { - val dataSetPlan = optimize(table.getRelNode) - translate(dataSetPlan) + val relNode = table.getRelNode + val dataSetPlan = optimize(relNode) + translate(dataSetPlan, relNode.getRowType) }

          /**

          • * Translates a logical [[RelNode]] into a [[DataSet]].
            + * Translates a logical [[RelNode]] into a [[DataSet]]. Converts to target type if necessary.
            *
          • @param logicalPlan The root node of the relational expression tree.
          • @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
          • @tparam A The type of the resulting [[DataSet]].
          • @return The [[DataSet]] that corresponds to the translated [[Table]].
            */
          • protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = {
            + protected def translate[A](
            + logicalPlan: RelNode,
            + logicalType: RelDataType)
              • End diff –

          See my explanation above. Thanks, I will add some comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100749841 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala — @@ -285,28 +286,37 @@ abstract class BatchTableEnvironment( @return The [ [DataSet] ] that corresponds to the translated [ [Table] ]. */ protected def translate [A] (table: Table)(implicit tpe: TypeInformation [A] ): DataSet [A] = { - val dataSetPlan = optimize(table.getRelNode) - translate(dataSetPlan) + val relNode = table.getRelNode + val dataSetPlan = optimize(relNode) + translate(dataSetPlan, relNode.getRowType) } /** * Translates a logical [ [RelNode] ] into a [ [DataSet] ]. + * Translates a logical [ [RelNode] ] into a [ [DataSet] ]. Converts to target type if necessary. * @param logicalPlan The root node of the relational expression tree. @param tpe The [ [TypeInformation] ] of the resulting [ [DataSet] ]. @tparam A The type of the resulting [ [DataSet] ]. @return The [ [DataSet] ] that corresponds to the translated [ [Table] ]. */ protected def translate [A] (logicalPlan: RelNode)(implicit tpe: TypeInformation [A] ): DataSet [A] = { + protected def translate [A] ( + logicalPlan: RelNode, + logicalType: RelDataType) End diff – See my explanation above. Thanks, I will add some comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100751589

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -428,6 +431,113 @@ abstract class TableEnvironment(val config: TableConfig)

          { (fieldNames.toArray, fieldIndexes.toArray) }

          + /**
          + * Creates a final converter that maps the internal row type to external type.
          + */
          + protected def sinkConversion[T](
          + physicalRowTypeInfo: TypeInformation[Row],
          — End diff –

          Yes, I just thought it make sense to check the two paths of logical types and translation somewhere. I will add more comments to this method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100751589 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -428,6 +431,113 @@ abstract class TableEnvironment(val config: TableConfig) { (fieldNames.toArray, fieldIndexes.toArray) } + /** + * Creates a final converter that maps the internal row type to external type. + */ + protected def sinkConversion [T] ( + physicalRowTypeInfo: TypeInformation [Row] , — End diff – Yes, I just thought it make sense to check the two paths of logical types and translation somewhere. I will add more comments to this method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100767364

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala —
          @@ -0,0 +1,98 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.plan.nodes
          +
          +import org.apache.flink.api.common.functions.MapFunction
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.table.api.TableConfig
          +import org.apache.flink.table.codegen.CodeGenerator
          +import org.apache.flink.table.runtime.MapRunner
          +import org.apache.flink.types.Row
          +
          +/**
          + * Common class for batch and stream scans.
          + */
          +trait CommonScan {
          +
          + /**
          + * We check if the input type is exactly the same as the internal row type.
          + * A conversion is necessary if types differ or object have to be unboxed
          + * (i.e. Date, Time, Timestamp need to be converted into their primitive equivalents).
          + */
          + private[flink] def needsConversion(
          + externalTypeInfo: TypeInformation[Any],
          + internalTypeInfo: TypeInformation[Row])
          + : Boolean = {
          +
          + if (externalTypeInfo == internalTypeInfo) {
          + val rowTypeInfo = externalTypeInfo.asInstanceOf[RowTypeInfo]
          + var containsBoxedTypes = false
          + // TODO enable these lines for FLINK-5429
          — End diff –

          I removed the lines for now. Actually, I wanted to change the internal row type to `long/int` in this PR but then I realized the complexity. I think we should do that as part of FLINK-5429.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100767364 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala — @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.runtime.MapRunner +import org.apache.flink.types.Row + +/** + * Common class for batch and stream scans. + */ +trait CommonScan { + + /** + * We check if the input type is exactly the same as the internal row type. + * A conversion is necessary if types differ or object have to be unboxed + * (i.e. Date, Time, Timestamp need to be converted into their primitive equivalents). + */ + private [flink] def needsConversion( + externalTypeInfo: TypeInformation [Any] , + internalTypeInfo: TypeInformation [Row] ) + : Boolean = { + + if (externalTypeInfo == internalTypeInfo) { + val rowTypeInfo = externalTypeInfo.asInstanceOf [RowTypeInfo] + var containsBoxedTypes = false + // TODO enable these lines for FLINK-5429 — End diff – I removed the lines for now. Actually, I wanted to change the internal row type to `long/int` in this PR but then I realized the complexity. I think we should do that as part of FLINK-5429 .
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3277

          Thanks for reviewing @KurtYoung and @fhueske. I updated the PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3277 Thanks for reviewing @KurtYoung and @fhueske. I updated the PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100772114

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -428,6 +431,123 @@ abstract class TableEnvironment(val config: TableConfig)

          { (fieldNames.toArray, fieldIndexes.toArray) }

          + /**
          + * Creates a final converter that maps the internal row type to external type.
          + *
          + * @param physicalRowTypeInfo the input of the sink
          + * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
          + * @param expectedTypeInfo the outptu type of the sink
          + * @param functionName name of the map function. Must not be unique but has to be a
          + * valid Java class identifier.
          + */
          + protected def sinkConversion[T](
          + physicalRowTypeInfo: TypeInformation[Row],
          + logicalRowType: RelDataType,
          + expectedTypeInfo: TypeInformation[T],
          — End diff –

          Change `expected` to `requested` in the error messages and variable names of this method?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100772114 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -428,6 +431,123 @@ abstract class TableEnvironment(val config: TableConfig) { (fieldNames.toArray, fieldIndexes.toArray) } + /** + * Creates a final converter that maps the internal row type to external type. + * + * @param physicalRowTypeInfo the input of the sink + * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) + * @param expectedTypeInfo the outptu type of the sink + * @param functionName name of the map function. Must not be unique but has to be a + * valid Java class identifier. + */ + protected def sinkConversion [T] ( + physicalRowTypeInfo: TypeInformation [Row] , + logicalRowType: RelDataType, + expectedTypeInfo: TypeInformation [T] , — End diff – Change `expected` to `requested` in the error messages and variable names of this method?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100772133

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -428,6 +431,123 @@ abstract class TableEnvironment(val config: TableConfig)

          { (fieldNames.toArray, fieldIndexes.toArray) }

          + /**
          + * Creates a final converter that maps the internal row type to external type.
          + *
          + * @param physicalRowTypeInfo the input of the sink
          + * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
          + * @param expectedTypeInfo the outptu type of the sink
          — End diff –

          typo

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100772133 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -428,6 +431,123 @@ abstract class TableEnvironment(val config: TableConfig) { (fieldNames.toArray, fieldIndexes.toArray) } + /** + * Creates a final converter that maps the internal row type to external type. + * + * @param physicalRowTypeInfo the input of the sink + * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) + * @param expectedTypeInfo the outptu type of the sink — End diff – typo
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100771883

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -428,6 +431,123 @@ abstract class TableEnvironment(val config: TableConfig)

          { (fieldNames.toArray, fieldIndexes.toArray) }

          + /**
          + * Creates a final converter that maps the internal row type to external type.
          + *
          + * @param physicalRowTypeInfo the input of the sink
          + * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
          + * @param expectedTypeInfo the outptu type of the sink
          + * @param functionName name of the map function. Must not be unique but has to be a
          + * valid Java class identifier.
          + */
          + protected def sinkConversion[T](
          + physicalRowTypeInfo: TypeInformation[Row],
          + logicalRowType: RelDataType,
          + expectedTypeInfo: TypeInformation[T],
          + functionName: String)
          + : Option[MapFunction[Row, T]] = {
          +
          + // validate that at least the field types of physical and logical type match
          + // we do that here to make sure that plan translation was correct
          + val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType)
          + if (physicalRowTypeInfo != logicalRowTypeInfo)

          { + throw TableException("The field types of physical and logical row types do not match." + + "This is a bug and should not happen. Please file an issue.") + }

          +
          + // expected type is a row, no conversion needed
          + // TODO this logic will change with FLINK-5429
          + if (expectedTypeInfo.getTypeClass == classOf[Row])

          { + return None + }

          +
          + // convert to type information
          + val logicalFieldTypes = logicalRowType.getFieldList.asScala map

          { relDataType => + FlinkTypeFactory.toTypeInfo(relDataType.getType) + }

          + // field names
          + val logicalFieldNames = logicalRowType.getFieldNames.asScala
          +
          + // validate expected type
          + if (expectedTypeInfo.getArity != logicalFieldTypes.length)

          { + throw new TableException("Arity of result does not match expected type.") + }

          + expectedTypeInfo match {
          +
          + // POJO type expected
          + case pt: PojoTypeInfo[_] =>
          + logicalFieldNames.zip(logicalFieldTypes) foreach {
          + case (fName, fType) =>
          + val pojoIdx = pt.getFieldIndex(fName)
          + if (pojoIdx < 0)

          { + throw new TableException(s"POJO does not define field name: $fName") + }

          + val expectedTypeInfo = pt.getTypeAt(pojoIdx)
          + if (fType != expectedTypeInfo)

          { + throw new TableException(s"Result field does not match expected type. " + + s"Expected: $expectedTypeInfo; Actual: $fType") + }

          + }
          +
          + // Tuple/Case class type expected
          + case ct: TupleTypeInfoBase[_] =>
          + logicalFieldTypes.zipWithIndex foreach {
          + case (fieldTypeInfo, i) =>
          + val expectedTypeInfo = ct.getTypeAt
          + if (fieldTypeInfo != expectedTypeInfo)

          { + throw new TableException(s"Result field does not match expected type. " + + s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo") + }

          + }
          +
          + // Atomic type expected
          + case at: AtomicType[_] =>
          + if (logicalFieldTypes.size != 1) {
          + throw new TableException(s"Result does not have a single field, " +
          — End diff –

          Improve error message: "Requested result type is an atomic type but result has more than a single field."

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100771883 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -428,6 +431,123 @@ abstract class TableEnvironment(val config: TableConfig) { (fieldNames.toArray, fieldIndexes.toArray) } + /** + * Creates a final converter that maps the internal row type to external type. + * + * @param physicalRowTypeInfo the input of the sink + * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) + * @param expectedTypeInfo the outptu type of the sink + * @param functionName name of the map function. Must not be unique but has to be a + * valid Java class identifier. + */ + protected def sinkConversion [T] ( + physicalRowTypeInfo: TypeInformation [Row] , + logicalRowType: RelDataType, + expectedTypeInfo: TypeInformation [T] , + functionName: String) + : Option[MapFunction [Row, T] ] = { + + // validate that at least the field types of physical and logical type match + // we do that here to make sure that plan translation was correct + val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType) + if (physicalRowTypeInfo != logicalRowTypeInfo) { + throw TableException("The field types of physical and logical row types do not match." + + "This is a bug and should not happen. Please file an issue.") + } + + // expected type is a row, no conversion needed + // TODO this logic will change with FLINK-5429 + if (expectedTypeInfo.getTypeClass == classOf [Row] ) { + return None + } + + // convert to type information + val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType => + FlinkTypeFactory.toTypeInfo(relDataType.getType) + } + // field names + val logicalFieldNames = logicalRowType.getFieldNames.asScala + + // validate expected type + if (expectedTypeInfo.getArity != logicalFieldTypes.length) { + throw new TableException("Arity of result does not match expected type.") + } + expectedTypeInfo match { + + // POJO type expected + case pt: PojoTypeInfo [_] => + logicalFieldNames.zip(logicalFieldTypes) foreach { + case (fName, fType) => + val pojoIdx = pt.getFieldIndex(fName) + if (pojoIdx < 0) { + throw new TableException(s"POJO does not define field name: $fName") + } + val expectedTypeInfo = pt.getTypeAt(pojoIdx) + if (fType != expectedTypeInfo) { + throw new TableException(s"Result field does not match expected type. " + + s"Expected: $expectedTypeInfo; Actual: $fType") + } + } + + // Tuple/Case class type expected + case ct: TupleTypeInfoBase [_] => + logicalFieldTypes.zipWithIndex foreach { + case (fieldTypeInfo, i) => + val expectedTypeInfo = ct.getTypeAt + if (fieldTypeInfo != expectedTypeInfo) { + throw new TableException(s"Result field does not match expected type. " + + s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo") + } + } + + // Atomic type expected + case at: AtomicType [_] => + if (logicalFieldTypes.size != 1) { + throw new TableException(s"Result does not have a single field, " + — End diff – Improve error message: "Requested result type is an atomic type but result has more than a single field."
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3277#discussion_r100807305

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala —
          @@ -170,7 +171,7 @@ abstract class BatchTableEnvironment(
          private[flink] def explain(table: Table, extended: Boolean): String = {
          val ast = table.getRelNode
          val optimizedPlan = optimize(ast)

          • val dataSet = translate[Row](optimizedPlan) (TypeExtractor.createTypeInfo(classOf[Row]))
            + val dataSet = translate[Row](optimizedPlan, ast.getRowType) (new RowTypeInfo())
              • End diff –

          Ok, make sense to me.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3277#discussion_r100807305 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala — @@ -170,7 +171,7 @@ abstract class BatchTableEnvironment( private [flink] def explain(table: Table, extended: Boolean): String = { val ast = table.getRelNode val optimizedPlan = optimize(ast) val dataSet = translate [Row] (optimizedPlan) (TypeExtractor.createTypeInfo(classOf [Row] )) + val dataSet = translate [Row] (optimizedPlan, ast.getRowType) (new RowTypeInfo()) End diff – Ok, make sense to me.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3277

          Merging this...

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3277 Merging this...
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.3.0: 6bc6b225e55095eb8797db2903b0546410e7fdd9

          Show
          twalthr Timo Walther added a comment - Fixed in 1.3.0: 6bc6b225e55095eb8797db2903b0546410e7fdd9
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr closed the pull request at:

          https://github.com/apache/flink/pull/3277

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr closed the pull request at: https://github.com/apache/flink/pull/3277

            People

            • Assignee:
              twalthr Timo Walther
              Reporter:
              twalthr Timo Walther
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development