Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6881

Creating a table from a POJO and defining a time attribute fails

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.1, 1.4.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Creating a table from a DataStream of POJOs fails when the user tries to define a rowtime attribute.

      There are multiple reasons in ExpressionParser as well as StreamTableEnvironment#validateAndExtractTimeAttributes.

      See also: https://stackoverflow.com/questions/44448022/apache-flink-1-3-table-api-rowtime-strange-behavior

        Issue Links

          Activity

          Hide
          twalthr Timo Walther added a comment -

          This does also not work:

                  Table table = tableEnv.fromDataStream(stream2,
                          "urlKey as urlKey," +
          		"statusCodeCount as statusCodeCount," +
          		"recordTime as rowtime.rowtime");
          

          Result:

          Exception in thread "main" java.lang.IndexOutOfBoundsException: 2
          	at scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
          	at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
          	at org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildLogicalRowType$1.apply(FlinkTypeFactory.scala:152)
          	at org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildLogicalRowType$1.apply(FlinkTypeFactory.scala:144)
          
          Show
          twalthr Timo Walther added a comment - This does also not work: Table table = tableEnv.fromDataStream(stream2, "urlKey as urlKey," + "statusCodeCount as statusCodeCount," + "recordTime as rowtime.rowtime" ); Result: Exception in thread "main" java.lang.IndexOutOfBoundsException: 2 at scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43) at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47) at org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildLogicalRowType$1.apply(FlinkTypeFactory.scala:152) at org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildLogicalRowType$1.apply(FlinkTypeFactory.scala:144)
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user twalthr opened a pull request:

          https://github.com/apache/flink/pull/4144

          FLINK-6881 FLINK-6896 [table] Creating a table from a POJO and defining a time attribute fails

          This PR fixes several issues with POJOs and time attributes.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/twalthr/flink FLINK-6881_NEW

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4144.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4144


          commit 0f5793159ca97d6d7e9f8a4b9fab3a3a2479fab8
          Author: twalthr <twalthr@apache.org>
          Date: 2017-06-19T15:06:44Z

          FLINK-6881 FLINK-6896 [table] Creating a table from a POJO and defining a time attribute fails


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/4144 FLINK-6881 FLINK-6896 [table] Creating a table from a POJO and defining a time attribute fails This PR fixes several issues with POJOs and time attributes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-6881 _NEW Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4144.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4144 commit 0f5793159ca97d6d7e9f8a4b9fab3a3a2479fab8 Author: twalthr <twalthr@apache.org> Date: 2017-06-19T15:06:44Z FLINK-6881 FLINK-6896 [table] Creating a table from a POJO and defining a time attribute fails
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4144#discussion_r122829892

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala —
          @@ -437,39 +437,64 @@ abstract class StreamTableEnvironment(
          var rowtime: Option[(Int, String)] = None
          var proctime: Option[(Int, String)] = None

          • exprs.zipWithIndex.foreach {
          • case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
          • if (rowtime.isDefined) {
            + def extractRowtime(idx: Int, name: String, origName: Option[String]): Unit = {
            + if (rowtime.isDefined) { + throw new TableException( + "The rowtime attribute can only be defined once in a table schema.") + }

            else {
            + val mappedIdx = streamType match

            { + case pti: PojoTypeInfo[_] => + pti.getFieldIndex(origName.getOrElse(name)) + case _ => idx; + }

            + // check type of field that is replaced
            + if (mappedIdx < fieldTypes.length &&
            + !(TypeCheckUtils.isLong(fieldTypes(mappedIdx)) ||
            + TypeCheckUtils.isTimePoint(fieldTypes(mappedIdx))))

            { throw new TableException( - "The rowtime attribute can only be defined once in a table schema.") - }

            else {

          • // check type of field that is replaced
          • if (idx < fieldTypes.length &&
          • !(TypeCheckUtils.isLong(fieldTypes(idx)) ||
          • TypeCheckUtils.isTimePoint(fieldTypes(idx)))) { - throw new TableException( - "The rowtime attribute can only be replace a field with a valid time type, such as " + - "Timestamp or Long.") - }
          • rowtime = Some(idx, name)
            + s"The rowtime attribute can only be replace a field with a valid time type, " +
              • End diff –

          remove "be" -> `"The rowtime attribute can only replace a field with ..."`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4144#discussion_r122829892 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala — @@ -437,39 +437,64 @@ abstract class StreamTableEnvironment( var rowtime: Option [(Int, String)] = None var proctime: Option [(Int, String)] = None exprs.zipWithIndex.foreach { case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) => if (rowtime.isDefined) { + def extractRowtime(idx: Int, name: String, origName: Option [String] ): Unit = { + if (rowtime.isDefined) { + throw new TableException( + "The rowtime attribute can only be defined once in a table schema.") + } else { + val mappedIdx = streamType match { + case pti: PojoTypeInfo[_] => + pti.getFieldIndex(origName.getOrElse(name)) + case _ => idx; + } + // check type of field that is replaced + if (mappedIdx < fieldTypes.length && + !(TypeCheckUtils.isLong(fieldTypes(mappedIdx)) || + TypeCheckUtils.isTimePoint(fieldTypes(mappedIdx)))) { throw new TableException( - "The rowtime attribute can only be defined once in a table schema.") - } else { // check type of field that is replaced if (idx < fieldTypes.length && !(TypeCheckUtils.isLong(fieldTypes(idx)) || TypeCheckUtils.isTimePoint(fieldTypes(idx)))) { - throw new TableException( - "The rowtime attribute can only be replace a field with a valid time type, such as " + - "Timestamp or Long.") - } rowtime = Some(idx, name) + s"The rowtime attribute can only be replace a field with a valid time type, " + End diff – remove "be" -> `"The rowtime attribute can only replace a field with ..."`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4144#discussion_r122852900

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala —
          @@ -437,39 +437,64 @@ abstract class StreamTableEnvironment(
          var rowtime: Option[(Int, String)] = None
          var proctime: Option[(Int, String)] = None

          • exprs.zipWithIndex.foreach {
          • case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
          • if (rowtime.isDefined) {
            + def extractRowtime(idx: Int, name: String, origName: Option[String]): Unit = {
            + if (rowtime.isDefined) { + throw new TableException( + "The rowtime attribute can only be defined once in a table schema.") + }

            else {
            + val mappedIdx = streamType match {
            + case pti: PojoTypeInfo[_] =>
            + pti.getFieldIndex(origName.getOrElse(name))

              • End diff –

          When user write a mistake row-time property name of POJO. e.g.:
          `(recordTimeA as rowtime).rowtime` --> correct name is `recordTime`.
          will get the exception as follows:
          ```
          Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1
          at org.apache.flink.table.api.StreamTableEnvironment.org$apache$flink$table$api$StreamTableEnvironment$$extractRowtime$1(StreamTableEnvironment.scala:453)
          at org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:484)
          ```
          I suggest that:
          1. May be we need check the row-time property name of POJO as early as possible.
          2. We should check the index value must >= 0, If no so, we should throw a exception with clearly error information.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4144#discussion_r122852900 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala — @@ -437,39 +437,64 @@ abstract class StreamTableEnvironment( var rowtime: Option [(Int, String)] = None var proctime: Option [(Int, String)] = None exprs.zipWithIndex.foreach { case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) => if (rowtime.isDefined) { + def extractRowtime(idx: Int, name: String, origName: Option [String] ): Unit = { + if (rowtime.isDefined) { + throw new TableException( + "The rowtime attribute can only be defined once in a table schema.") + } else { + val mappedIdx = streamType match { + case pti: PojoTypeInfo [_] => + pti.getFieldIndex(origName.getOrElse(name)) End diff – When user write a mistake row-time property name of POJO. e.g.: `(recordTimeA as rowtime).rowtime` --> correct name is `recordTime`. will get the exception as follows: ``` Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1 at org.apache.flink.table.api.StreamTableEnvironment.org$apache$flink$table$api$StreamTableEnvironment$$extractRowtime$1(StreamTableEnvironment.scala:453) at org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:484) ``` I suggest that: 1. May be we need check the row-time property name of POJO as early as possible. 2. We should check the index value must >= 0, If no so, we should throw a exception with clearly error information.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4144#discussion_r122922340

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala —
          @@ -437,39 +437,64 @@ abstract class StreamTableEnvironment(
          var rowtime: Option[(Int, String)] = None
          var proctime: Option[(Int, String)] = None

          • exprs.zipWithIndex.foreach {
          • case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
          • if (rowtime.isDefined) {
            + def extractRowtime(idx: Int, name: String, origName: Option[String]): Unit = {
            + if (rowtime.isDefined) { + throw new TableException( + "The rowtime attribute can only be defined once in a table schema.") + }

            else {
            + val mappedIdx = streamType match {
            + case pti: PojoTypeInfo[_] =>
            + pti.getFieldIndex(origName.getOrElse(name))

              • End diff –

          Thanks @sunjincheng121. Good point.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4144#discussion_r122922340 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala — @@ -437,39 +437,64 @@ abstract class StreamTableEnvironment( var rowtime: Option [(Int, String)] = None var proctime: Option [(Int, String)] = None exprs.zipWithIndex.foreach { case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) => if (rowtime.isDefined) { + def extractRowtime(idx: Int, name: String, origName: Option [String] ): Unit = { + if (rowtime.isDefined) { + throw new TableException( + "The rowtime attribute can only be defined once in a table schema.") + } else { + val mappedIdx = streamType match { + case pti: PojoTypeInfo [_] => + pti.getFieldIndex(origName.getOrElse(name)) End diff – Thanks @sunjincheng121. Good point.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/4144

          I will merge this...

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4144 I will merge this...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/4144

          +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/4144 +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4144#discussion_r122924846

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -1666,14 +1666,8 @@ class CodeGenerator(
          : GeneratedExpression = {
          — End diff –

          The `fieldMapping` is never used, should we remove it?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4144#discussion_r122924846 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1666,14 +1666,8 @@ class CodeGenerator( : GeneratedExpression = { — End diff – The `fieldMapping` is never used, should we remove it?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4144#discussion_r122937741

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -1666,14 +1666,8 @@ class CodeGenerator(
          : GeneratedExpression = {
          — End diff –

          We could. Maybe I have to check that again.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4144#discussion_r122937741 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1666,14 +1666,8 @@ class CodeGenerator( : GeneratedExpression = { — End diff – We could. Maybe I have to check that again.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4144

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4144
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.4: b9cce516707e14005767ef453da46851f43b9ebb
          Fixed in 1.3: 930216ef95cf7098d7f9f3ba7403dacc3b433817

          Show
          twalthr Timo Walther added a comment - Fixed in 1.4: b9cce516707e14005767ef453da46851f43b9ebb Fixed in 1.3: 930216ef95cf7098d7f9f3ba7403dacc3b433817

            People

            • Assignee:
              twalthr Timo Walther
              Reporter:
              twalthr Timo Walther
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development