Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6896

Creating a table from a POJO and use table sink to output fail

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.1, 1.4.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Following example fails at sink, using debug mode to see the reason of ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?

      Sample:

      TumblingWindow.java
      public class TumblingWindow {
      
          public static void main(String[] args) throws Exception {
              List<Content> data = new ArrayList<Content>();
              data.add(new Content(1L, "Hi"));
              data.add(new Content(2L, "Hallo"));
              data.add(new Content(3L, "Hello"));
              data.add(new Content(4L, "Hello"));
              data.add(new Content(7L, "Hello"));
              data.add(new Content(8L, "Hello world"));
              data.add(new Content(16L, "Hello world"));
      
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      
              DataStream<Content> stream = env.fromCollection(data);
      
              DataStream<Content> stream2 = stream.assignTimestampsAndWatermarks(
                      new BoundedOutOfOrdernessTimestampExtractor<Content>(Time.milliseconds(1)) {
      
                          /**
                           * 
                           */
                          private static final long serialVersionUID = 410512296011057717L;
      
                          @Override
                          public long extractTimestamp(Content element) {
                              return element.getRecordTime();
                          }
      
                      });
      
              final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
      
              Table table = tableEnv.fromDataStream(stream2, "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
      
              Table windowTable = table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, urlKey")
                      .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum ");
      
              //table.printSchema();
      
              TableSink<Row> windowSink = new CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
                      WriteMode.OVERWRITE);
              windowTable.writeToSink(windowSink);
      
              // tableEnv.toDataStream(windowTable, Row.class).print();
              env.execute();
          }
      
          public static class Content implements Serializable {
      
              /**
               * 
               */
              private static final long serialVersionUID = 1429246948772430441L;
      
              private String urlKey;
      
              private long recordTime;
              // private String recordTimeStr;
      
              private long httpGetMessageCount;
              private long httpPostMessageCount;
              private long uplink;
              private long downlink;
              private long statusCode;
              private long statusCodeCount;
      
              public Content() {
                  super();
              }
      
              public Content(long recordTime, String urlKey) {
                  super();
                  this.recordTime = recordTime;
                  this.urlKey = urlKey;
              }
      
              public String getUrlKey() {
                  return urlKey;
              }
      
              public void setUrlKey(String urlKey) {
                  this.urlKey = urlKey;
              }
      
              public long getRecordTime() {
                  return recordTime;
              }
      
              public void setRecordTime(long recordTime) {
                  this.recordTime = recordTime;
              }
      
              public long getHttpGetMessageCount() {
                  return httpGetMessageCount;
              }
      
              public void setHttpGetMessageCount(long httpGetMessageCount) {
                  this.httpGetMessageCount = httpGetMessageCount;
              }
      
              public long getHttpPostMessageCount() {
                  return httpPostMessageCount;
              }
      
              public void setHttpPostMessageCount(long httpPostMessageCount) {
                  this.httpPostMessageCount = httpPostMessageCount;
              }
      
              public long getUplink() {
                  return uplink;
              }
      
              public void setUplink(long uplink) {
                  this.uplink = uplink;
              }
      
              public long getDownlink() {
                  return downlink;
              }
      
              public void setDownlink(long downlink) {
                  this.downlink = downlink;
              }
      
              public long getStatusCode() {
                  return statusCode;
              }
      
              public void setStatusCode(long statusCode) {
                  this.statusCode = statusCode;
              }
      
              public long getStatusCodeCount() {
                  return statusCodeCount;
              }
      
              public void setStatusCodeCount(long statusCodeCount) {
                  this.statusCodeCount = statusCodeCount;
              }
      
          }
      
          private class TimestampWithEqualWatermark implements AssignerWithPunctuatedWatermarks<Object[]> {
      
              /**
               * 
               */
              private static final long serialVersionUID = 1L;
      
              @Override
              public long extractTimestamp(Object[] element, long previousElementTimestamp) {
                  // TODO Auto-generated method stub
                  return (long) element[0];
              }
      
              @Override
              public Watermark checkAndGetNextWatermark(Object[] lastElement, long extractedTimestamp) {
                  return new Watermark(extractedTimestamp);
              }
      
          }
      }
      

      Exception trace

      Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 6
      	at org.apache.flink.table.codegen.CodeGenerator.org$apache$flink$table$codegen$CodeGenerator$$generateFieldAccess(CodeGenerator.scala:1661)
      	at org.apache.flink.table.codegen.CodeGenerator.org$apache$flink$table$codegen$CodeGenerator$$generateInputAccess(CodeGenerator.scala:1599)
      	at org.apache.flink.table.codegen.CodeGenerator$$anonfun$26.apply(CodeGenerator.scala:875)
      	at org.apache.flink.table.codegen.CodeGenerator$$anonfun$26.apply(CodeGenerator.scala:874)
      	at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:728)
      	at scala.collection.immutable.Range.foreach(Range.scala:166)
      	at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:727)
      	at org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:874)
      	at org.apache.flink.table.plan.nodes.CommonScan$class.generatedConversionFunction(CommonScan.scala:57)
      	at org.apache.flink.table.plan.nodes.datastream.DataStreamScan.generatedConversionFunction(DataStreamScan.scala:36)
      	at org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:48)
      	at org.apache.flink.table.plan.nodes.datastream.DataStreamScan.convertToInternalRow(DataStreamScan.scala:36)
      	at org.apache.flink.table.plan.nodes.datastream.DataStreamScan.translateToPlan(DataStreamScan.scala:63)
      	at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:94)
      	at org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate.translateToPlan(DataStreamGroupWindowAggregate.scala:119)
      	at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:94)
      	at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:678)
      	at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:637)
      	at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:214)
      	at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
      	at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
      	at com.taiwanmobile.cep.noc.TumblingWindow.main(TumblingWindow.java:66)
      
      
      1. debug.png
        389 kB
        Mark You

        Issue Links

          Activity

          Hide
          bonbonmark Mark You added a comment -

          Here is debug information

          Show
          bonbonmark Mark You added a comment - Here is debug information
          Hide
          sunjincheng121 sunjincheng added a comment -

          HI,Mark You Thanks for file this JIRA. It's seems a index mapping error because process rowtime filed logic.

          Show
          sunjincheng121 sunjincheng added a comment - HI, Mark You Thanks for file this JIRA. It's seems a index mapping error because process rowtime filed logic.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user sunjincheng121 opened a pull request:

          https://github.com/apache/flink/pull/4111

          FLINK-6896[table] Fix generate PojoType input result exression error.

          • [x] General
          • The pull request references the related JIRA issue ("FLINK-6896[table] Fix generate PojoType input result exression error.")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/sunjincheng121/flink FLINK-6896-PR

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4111.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4111


          commit 88243f213739c3cbcaee7433ddb0e65f2e28628e
          Author: sunjincheng121 <sunjincheng121@gmail.com>
          Date: 2017-06-12T15:31:15Z

          FLINK-6896[table] Fix generate PojoType input result exression error.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/4111 FLINK-6896 [table] Fix generate PojoType input result exression error. [x] General The pull request references the related JIRA issue (" FLINK-6896 [table] Fix generate PojoType input result exression error.") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6896 -PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4111.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4111 commit 88243f213739c3cbcaee7433ddb0e65f2e28628e Author: sunjincheng121 <sunjincheng121@gmail.com> Date: 2017-06-12T15:31:15Z FLINK-6896 [table] Fix generate PojoType input result exression error.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hustfxj commented on the issue:

          https://github.com/apache/flink/pull/4111

          @sunjincheng121 Thank you for the Fix. Maybe it had better remove the followed codes at the Func *generateFieldAccess*.
          ```
          val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo[_]])

          { fieldMapping(index) }

          ```
          And this bug was import by FLINK-5884, due to the judgment *input1Mapping* contains the field index or not at the Func *generateConverterResultExpression&generateCorrelateAccessExprs*.

          Show
          githubbot ASF GitHub Bot added a comment - Github user hustfxj commented on the issue: https://github.com/apache/flink/pull/4111 @sunjincheng121 Thank you for the Fix. Maybe it had better remove the followed codes at the Func * generateFieldAccess *. ``` val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo [_] ]) { fieldMapping(index) } ``` And this bug was import by FLINK-5884 , due to the judgment * input1Mapping * contains the field index or not at the Func * generateConverterResultExpression&generateCorrelateAccessExprs *.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/4111

          Hi @hustfxj Thanks for your review. If I understand you correctly. you want delete the code which you mentioned above. But I'm afraid we can not delete that code. Because when output type is `PojoTypeInfo`, we really need that code. Can you double check it And Please feedback me?

          Thanks,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/4111 Hi @hustfxj Thanks for your review. If I understand you correctly. you want delete the code which you mentioned above. But I'm afraid we can not delete that code. Because when output type is `PojoTypeInfo`, we really need that code. Can you double check it And Please feedback me? Thanks, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hustfxj commented on the issue:

          https://github.com/apache/flink/pull/4111

          @sunjincheng121 Sorry, I didn't get you. I think the column will not be out of order If we delete the code. Because we already ensure that the index is at the correct range by the previous
          code `input1Mapping.contains`. Let me know if I'm wrong. Thank you!

          Show
          githubbot ASF GitHub Bot added a comment - Github user hustfxj commented on the issue: https://github.com/apache/flink/pull/4111 @sunjincheng121 Sorry, I didn't get you. I think the column will not be out of order If we delete the code. Because we already ensure that the index is at the correct range by the previous code `input1Mapping.contains `. Let me know if I'm wrong. Thank you!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/4111

          For example (from test case):
          You can run `org.apache.flink.table.api.java.batch.TableEnvironmentITCase#testAsFromAndToPrivateFieldPojo`

          • input1:

          ```
          0=age(Integer)
          1=department(String)
          2=name(String)
          3=(Double)
          ```

          • input1Mapping:

          ```
          0=1
          1=0
          2=3
          3=2
          ```

          Using this PR code logic,we get:

          input1AccessExprs:

          ```
          0=String
          1=Integer
          2=Double
          3=String
          ```
          If Using `for (i <- 0 until input1.getArity if input1Mapping.contains)` and delete you mentioned code.we get:
          input1AccessExprs:

          ```
          0=Integer
          1=String
          2=String
          3=Double
          ```
          That not we want, then the following code will throw exception:
          `Incompatible types of expression and result type.`

          I try to express my point clearly. But feel free to tell me If there something not enough.

          Thanks,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/4111 For example (from test case): You can run `org.apache.flink.table.api.java.batch.TableEnvironmentITCase#testAsFromAndToPrivateFieldPojo` input1: ``` 0=age(Integer) 1=department(String) 2=name(String) 3=(Double) ``` input1Mapping: ``` 0=1 1=0 2=3 3=2 ``` Using this PR code logic,we get: input1AccessExprs: ``` 0=String 1=Integer 2=Double 3=String ``` If Using `for (i <- 0 until input1.getArity if input1Mapping.contains )` and delete you mentioned code.we get: input1AccessExprs: ``` 0=Integer 1=String 2=String 3=Double ``` That not we want, then the following code will throw exception: `Incompatible types of expression and result type.` I try to express my point clearly. But feel free to tell me If there something not enough. Thanks, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hustfxj commented on the issue:

          https://github.com/apache/flink/pull/4111

          @sunjincheng121 I get it. Thank you very much!

          Show
          githubbot ASF GitHub Bot added a comment - Github user hustfxj commented on the issue: https://github.com/apache/flink/pull/4111 @sunjincheng121 I get it. Thank you very much!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user hustfxj commented on the issue:

          https://github.com/apache/flink/pull/4111

          Looks good to me.
          +1 to merged.

          Show
          githubbot ASF GitHub Bot added a comment - Github user hustfxj commented on the issue: https://github.com/apache/flink/pull/4111 Looks good to me. +1 to merged.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4111#discussion_r121938446

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala —
          @@ -179,6 +180,36 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
          }

          @Test
          + def testEventTimeTumblingWindowWithPojoData(): Unit = {
          — End diff –

          The failure is unrelated to windows. Hence, I would not add the test to this class and remove the window. `TimeAttributesITCase` is a better place to validate that converting a DataStream of Pojos works with time attributes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r121938446 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala — @@ -179,6 +180,36 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { } @Test + def testEventTimeTumblingWindowWithPojoData(): Unit = { — End diff – The failure is unrelated to windows. Hence, I would not add the test to this class and remove the window. `TimeAttributesITCase` is a better place to validate that converting a DataStream of Pojos works with time attributes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4111#discussion_r121937961

          — Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/Pojos.java —
          @@ -0,0 +1,61 @@
          +package org.apache.flink.table.api.java.stream.utils;
          — End diff –

          The Apache License header is missing. Please build PRs locally (or on Travis) to ensure that the build passes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r121937961 — Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/Pojos.java — @@ -0,0 +1,61 @@ +package org.apache.flink.table.api.java.stream.utils; — End diff – The Apache License header is missing. Please build PRs locally (or on Travis) to ensure that the build passes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4111#discussion_r121938625

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamTestData.scala —
          @@ -80,4 +83,28 @@ object StreamTestData

          { data.+=((5, 15L, 14, "KLM", 2L)) env.fromCollection(data) }

          +
          + def getPojo0DataStream(env: StreamExecutionEnvironment): DataStream[Pojo0] = {
          + val data = new mutable.MutableList[Pojo0]
          + val p0 = new Pojo0
          — End diff –

          We can make this more concise if we add two constructors to `Pojo0` (default without args and with all member variable).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r121938625 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamTestData.scala — @@ -80,4 +83,28 @@ object StreamTestData { data.+=((5, 15L, 14, "KLM", 2L)) env.fromCollection(data) } + + def getPojo0DataStream(env: StreamExecutionEnvironment): DataStream [Pojo0] = { + val data = new mutable.MutableList [Pojo0] + val p0 = new Pojo0 — End diff – We can make this more concise if we add two constructors to `Pojo0` (default without args and with all member variable).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/4111

          Thanks for your changes. @fhueske

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/4111 Thanks for your changes. @fhueske
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4111#discussion_r122152659

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -871,12 +871,24 @@ class CodeGenerator(
          returnType: TypeInformation[_ <: Any],
          resultFieldNames: Seq[String])
          : GeneratedExpression = {

          • val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains)
          • yield generateInputAccess(input1, input1Term, i, input1Mapping)
            +
            + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
              • End diff –

          Maybe a better solution is to change this code block into

          ```scala
          val input1AccessExprs = for (i <- input1Mapping.indices)
          yield generateInputAccess(input1, input1Term, i, input1Mapping)

          val input2AccessExprs = input2 match

          { case Some(ti) => for (i <- input2Mapping.indices) yield generateInputAccess(ti, input2Term, i, input2Mapping) case None => Seq() // add nothing }

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122152659 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation [_ <: Any] , resultFieldNames: Seq [String] ) : GeneratedExpression = { val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains ) yield generateInputAccess(input1, input1Term, i, input1Mapping) + + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo [_] ]) { End diff – Maybe a better solution is to change this code block into ```scala val input1AccessExprs = for (i <- input1Mapping.indices) yield generateInputAccess(input1, input1Term, i, input1Mapping) val input2AccessExprs = input2 match { case Some(ti) => for (i <- input2Mapping.indices) yield generateInputAccess(ti, input2Term, i, input2Mapping) case None => Seq() // add nothing } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4111#discussion_r122141845

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -871,12 +871,24 @@ class CodeGenerator(
          returnType: TypeInformation[_ <: Any],
          resultFieldNames: Seq[String])
          : GeneratedExpression = {

          • val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains)
          • yield generateInputAccess(input1, input1Term, i, input1Mapping)
            +
            + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

            else

            { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

          val input2AccessExprs = input2 match {

          • case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains)
          • yield generateInputAccess(ti, input2Term, i, input2Mapping)
            + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
              • End diff –

          `input2` should be `ti`, here `input2` is an `Option` which can't be `PojoTypeInfo`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122141845 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation [_ <: Any] , resultFieldNames: Seq [String] ) : GeneratedExpression = { val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains ) yield generateInputAccess(input1, input1Term, i, input1Mapping) + + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo [_] ]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } val input2AccessExprs = input2 match { case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains ) yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo [_] ]) { End diff – `input2` should be `ti`, here `input2` is an `Option` which can't be `PojoTypeInfo`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4111#discussion_r122168412

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -871,12 +871,24 @@ class CodeGenerator(
          returnType: TypeInformation[_ <: Any],
          resultFieldNames: Seq[String])
          : GeneratedExpression = {

          • val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains)
          • yield generateInputAccess(input1, input1Term, i, input1Mapping)
            +
            + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

            else

            { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

          val input2AccessExprs = input2 match {

          • case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains)
          • yield generateInputAccess(ti, input2Term, i, input2Mapping)
            + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
              • End diff –

          Right, I noticed that as well

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122168412 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation [_ <: Any] , resultFieldNames: Seq [String] ) : GeneratedExpression = { val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains ) yield generateInputAccess(input1, input1Term, i, input1Mapping) + + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo [_] ]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } val input2AccessExprs = input2 match { case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains ) yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo [_] ]) { End diff – Right, I noticed that as well
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4111#discussion_r122168658

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -871,12 +871,24 @@ class CodeGenerator(
          returnType: TypeInformation[_ <: Any],
          resultFieldNames: Seq[String])
          : GeneratedExpression = {

          • val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains)
          • yield generateInputAccess(input1, input1Term, i, input1Mapping)
            +
            + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
              • End diff –

          I think I tried that but it cause a couple of tests to fail.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122168658 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation [_ <: Any] , resultFieldNames: Seq [String] ) : GeneratedExpression = { val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains ) yield generateInputAccess(input1, input1Term, i, input1Mapping) + + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo [_] ]) { End diff – I think I tried that but it cause a couple of tests to fail.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4111#discussion_r122176275

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -871,12 +871,24 @@ class CodeGenerator(
          returnType: TypeInformation[_ <: Any],
          resultFieldNames: Seq[String])
          : GeneratedExpression = {

          • val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains)
          • yield generateInputAccess(input1, input1Term, i, input1Mapping)
            +
            + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

            else

            { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

          val input2AccessExprs = input2 match {

          • case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains)
          • yield generateInputAccess(ti, input2Term, i, input2Mapping)
            + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
              • End diff –

          @fhueske @wuchong thanks for try improve this PR. @wuchong For your solution, In my side also got two error when I run `TimeAttributesITCase`. Have you run all test in your local side ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122176275 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation [_ <: Any] , resultFieldNames: Seq [String] ) : GeneratedExpression = { val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains ) yield generateInputAccess(input1, input1Term, i, input1Mapping) + + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo [_] ]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } val input2AccessExprs = input2 match { case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains ) yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo [_] ]) { End diff – @fhueske @wuchong thanks for try improve this PR. @wuchong For your solution, In my side also got two error when I run `TimeAttributesITCase`. Have you run all test in your local side ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4111#discussion_r122176847

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -871,12 +871,24 @@ class CodeGenerator(
          returnType: TypeInformation[_ <: Any],
          resultFieldNames: Seq[String])
          : GeneratedExpression = {

          • val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains)
          • yield generateInputAccess(input1, input1Term, i, input1Mapping)
            +
            + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

            else

            { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

          val input2AccessExprs = input2 match {

          • case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains)
          • yield generateInputAccess(ti, input2Term, i, input2Mapping)
            + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
              • End diff –

          @wuchong I think the current PR. works well. So I suggest you can open a new PR. to improve it.
          What do you think? @fhueske @wuchong

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122176847 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation [_ <: Any] , resultFieldNames: Seq [String] ) : GeneratedExpression = { val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains ) yield generateInputAccess(input1, input1Term, i, input1Mapping) + + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo [_] ]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } val input2AccessExprs = input2 match { case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains ) yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo [_] ]) { End diff – @wuchong I think the current PR. works well. So I suggest you can open a new PR. to improve it. What do you think? @fhueske @wuchong
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4111#discussion_r122178152

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -871,12 +871,24 @@ class CodeGenerator(
          returnType: TypeInformation[_ <: Any],
          resultFieldNames: Seq[String])
          : GeneratedExpression = {

          • val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains)
          • yield generateInputAccess(input1, input1Term, i, input1Mapping)
            +
            + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

            else

            { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

          val input2AccessExprs = input2 match {

          • case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains)
          • yield generateInputAccess(ti, input2Term, i, input2Mapping)
            + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
              • End diff –

          Emm.. But it works good in my local. I pushed it to my branch, can you check it out? https://github.com/wuchong/flink/commit/ad97e84ca561ea32d2ce5e0779f4aff6429b5523

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122178152 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation [_ <: Any] , resultFieldNames: Seq [String] ) : GeneratedExpression = { val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains ) yield generateInputAccess(input1, input1Term, i, input1Mapping) + + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo [_] ]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } val input2AccessExprs = input2 match { case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains ) yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo [_] ]) { End diff – Emm.. But it works good in my local. I pushed it to my branch, can you check it out? https://github.com/wuchong/flink/commit/ad97e84ca561ea32d2ce5e0779f4aff6429b5523
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4111#discussion_r122180812

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -871,12 +871,24 @@ class CodeGenerator(
          returnType: TypeInformation[_ <: Any],
          resultFieldNames: Seq[String])
          : GeneratedExpression = {

          • val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains)
          • yield generateInputAccess(input1, input1Term, i, input1Mapping)
            +
            + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

            else

            { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

          val input2AccessExprs = input2 match {

          • case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains)
          • yield generateInputAccess(ti, input2Term, i, input2Mapping)
            + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
              • End diff –

          I think your branch looks good @wuchong. I tried a similar thing but did not change the `generateFieldAccess()` methods because I was afraid of side effects if we use the mapping for any kind types (instead of just POJOs).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122180812 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation [_ <: Any] , resultFieldNames: Seq [String] ) : GeneratedExpression = { val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains ) yield generateInputAccess(input1, input1Term, i, input1Mapping) + + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo [_] ]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } val input2AccessExprs = input2 match { case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains ) yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo [_] ]) { End diff – I think your branch looks good @wuchong. I tried a similar thing but did not change the `generateFieldAccess()` methods because I was afraid of side effects if we use the mapping for any kind types (instead of just POJOs).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4111#discussion_r122194058

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -871,12 +871,24 @@ class CodeGenerator(
          returnType: TypeInformation[_ <: Any],
          resultFieldNames: Seq[String])
          : GeneratedExpression = {

          • val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains)
          • yield generateInputAccess(input1, input1Term, i, input1Mapping)
            +
            + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

            else

            { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

          val input2AccessExprs = input2 match {

          • case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains)
          • yield generateInputAccess(ti, input2Term, i, input2Mapping)
            + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
              • End diff –

          I check the usages of `generateFieldAccess()` and the changes seem to be OK.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122194058 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation [_ <: Any] , resultFieldNames: Seq [String] ) : GeneratedExpression = { val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains ) yield generateInputAccess(input1, input1Term, i, input1Mapping) + + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo [_] ]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } val input2AccessExprs = input2 match { case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains ) yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo [_] ]) { End diff – I check the usages of `generateFieldAccess()` and the changes seem to be OK.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4111#discussion_r122201344

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -871,12 +871,24 @@ class CodeGenerator(
          returnType: TypeInformation[_ <: Any],
          resultFieldNames: Seq[String])
          : GeneratedExpression = {

          • val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains)
          • yield generateInputAccess(input1, input1Term, i, input1Mapping)
            +
            + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

            else

            { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

          val input2AccessExprs = input2 match {

          • case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains)
          • yield generateInputAccess(ti, input2Term, i, input2Mapping)
            + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
              • End diff –

          @fhueske thanks for your reviewing. So if the travis pass, I will merge the code ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122201344 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation [_ <: Any] , resultFieldNames: Seq [String] ) : GeneratedExpression = { val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains ) yield generateInputAccess(input1, input1Term, i, input1Mapping) + + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo [_] ]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } val input2AccessExprs = input2 match { case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains ) yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo [_] ]) { End diff – @fhueske thanks for your reviewing. So if the travis pass, I will merge the code ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4111#discussion_r122205799

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -871,12 +871,24 @@ class CodeGenerator(
          returnType: TypeInformation[_ <: Any],
          resultFieldNames: Seq[String])
          : GeneratedExpression = {

          • val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains)
          • yield generateInputAccess(input1, input1Term, i, input1Mapping)
            +
            + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

            else

            { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

          val input2AccessExprs = input2 match {

          • case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains)
          • yield generateInputAccess(ti, input2Term, i, input2Mapping)
            + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
              • End diff –

          Thanks @wuchong I check it in my side. nice.
          +1

          Best,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122205799 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation [_ <: Any] , resultFieldNames: Seq [String] ) : GeneratedExpression = { val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains ) yield generateInputAccess(input1, input1Term, i, input1Mapping) + + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo [_] ]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } val input2AccessExprs = input2 match { case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains ) yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo [_] ]) { End diff – Thanks @wuchong I check it in my side. nice. +1 Best, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4111#discussion_r122212537

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -871,12 +871,24 @@ class CodeGenerator(
          returnType: TypeInformation[_ <: Any],
          resultFieldNames: Seq[String])
          : GeneratedExpression = {

          • val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains)
          • yield generateInputAccess(input1, input1Term, i, input1Mapping)
            +
            + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

            else

            { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + }

          val input2AccessExprs = input2 match {

          • case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains)
          • yield generateInputAccess(ti, input2Term, i, input2Mapping)
            + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
              • End diff –

          Yes, @wuchong please merge

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122212537 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation [_ <: Any] , resultFieldNames: Seq [String] ) : GeneratedExpression = { val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains ) yield generateInputAccess(input1, input1Term, i, input1Mapping) + + val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo [_] ]) { + for (i <- 0 until input1Mapping.length) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) + } val input2AccessExprs = input2 match { case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains ) yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo [_] ]) { End diff – Yes, @wuchong please merge
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/4111

          Travis reported a bug about composite type. Fixed it. Waiting for the new CI pass.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/4111 Travis reported a bug about composite type. Fixed it. Waiting for the new CI pass.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/4111

          @wuchong Can we wait one more day before merging this? I also want to take a look at it. It is related to FLINK-6881.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4111 @wuchong Can we wait one more day before merging this? I also want to take a look at it. It is related to FLINK-6881 .
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/4111

          Please see my message in the 1.3.1 thread on the dev@ list

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/4111 Please see my message in the 1.3.1 thread on the dev@ list
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/4111

          @twalthr Thank you for pay attention to this PR. Feel free to left your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/4111 @twalthr Thank you for pay attention to this PR. Feel free to left your comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/4111

          Hi @twalthr I have test this case using the first approach of FLINK-6896. without `as` clause. It's works well. And with the `as` clause, It can be fixed by add a case match in `StreamTableEnvironment#validateAndExtractTimeAttributes`. the code as follows:
          `case (Alias(child, name,_), _) => fieldNames = name :: fieldNames` What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/4111 Hi @twalthr I have test this case using the first approach of FLINK-6896 . without `as` clause. It's works well. And with the `as` clause, It can be fixed by add a case match in `StreamTableEnvironment#validateAndExtractTimeAttributes`. the code as follows: `case (Alias(child, name,_), _) => fieldNames = name :: fieldNames` What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/4111

          @sunjincheng121 Sorry, that I haven't responded earlier. I was on vacation last week. I found a solution that requires less CodeGenerator changes. I found several issues (also regarding Java expression parsing). I will open a PR shortly. We can test the PR, with the test cases of this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4111 @sunjincheng121 Sorry, that I haven't responded earlier. I was on vacation last week. I found a solution that requires less CodeGenerator changes. I found several issues (also regarding Java expression parsing). I will open a PR shortly. We can test the PR, with the test cases of this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/4111

          Hi @twalthr Sounds good, And make sense to me. I think i can do some code review or check some test case when you opened the PR.

          Best,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/4111 Hi @twalthr Sounds good, And make sense to me. I think i can do some code review or check some test case when you opened the PR. Best, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/4111

          @sunjincheng121 I opened #4144. Feel free to review and test. I really hope that I could cover all cases.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4111 @sunjincheng121 I opened #4144. Feel free to review and test. I really hope that I could cover all cases.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/4111

          Of course @twalthr , I haven't merged the code and thanks for your new PR. I will have a look tomorrow.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/4111 Of course @twalthr , I haven't merged the code and thanks for your new PR. I will have a look tomorrow.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4111

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4111
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.4: b9cce516707e14005767ef453da46851f43b9ebb
          Fixed in 1.3: 930216ef95cf7098d7f9f3ba7403dacc3b433817

          Show
          twalthr Timo Walther added a comment - Fixed in 1.4: b9cce516707e14005767ef453da46851f43b9ebb Fixed in 1.3: 930216ef95cf7098d7f9f3ba7403dacc3b433817

            People

            • Assignee:
              sunjincheng121 sunjincheng
              Reporter:
              bonbonmark Mark You
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development