Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5884

Integrate time indicators for Table API & SQL

    Details

    • Type: New Feature
    • Status: Closed
    • Priority: Critical
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      We already discussed the need for a proper integration of time indicators (event-time or processing-time) for both the Table API & SQL on the ML:

      http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-SQL-indicators-for-event-and-processing-time-tp15927.html

      This issue will track the progress. I will work on a design document how we can solve this issue.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user twalthr opened a pull request:

          https://github.com/apache/flink/pull/3808

          FLINK-5884 [table] Integrate time indicators for Table API & SQL

          This PR introduces time indicators in the Table & SQL API. The PR splits the current row type into a logical and physical row type. Time indicators are part of every logical row and stay logical if there are just forwarded and not accessed for calculation. If a time attribute is passed to a function or arithmetic expression it will be materialized. A ProcessFunction can access the meta timestamp for materialization (code generation for this has to be implemented). Every logical plan includes time attributes now and conistent:

          ```
          val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)

          val windowedTable = table
          .window(Session withGap 7.milli on 'long as 'w)
          .groupBy('w, 'string)
          .select('string, 'int.count)

          val expected = unaryNode(
          "DataStreamAggregate",
          streamTableNode(0),
          term("groupBy", "string"),
          term(
          "window",
          SessionGroupWindow(
          WindowReference("w"),
          'long,
          7.milli)),
          term("select", "string", "COUNT(int) AS TMP_0")
          )
          ```

          Main changes of the current solution:

          • Processing and rowtime time indicators can be named arbitrarily
          • They can be defined as follows: stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime) or stream.toTable(tEnv, 'long.rowtime, 'int, 'string)
          • In a streaming environment: if the "long" field is already defined in the record, it will not be read by the runtime. "long" always represents the timestamp of the row.
          • In batch environment: "long" must be present in the record and will be read by the runtime.
          • The table definition looks equivalent in both batch and streaming (better unification than current state)
          • Internally row types are split up in a logical and a physical row type.
          • The logical row type contains time indicators, the physical rowtime never contains time indicators (the pure "long" will never be in a record)
          • After validation and query decorrelation, a special time indicator converter traverses the RelNodes and analyzes if the a time indicator is accessed or only forwarded.
          • An access to a time indicator means that we need to materialize the rowtime using a ProcessFunction (not yet implemented). The timestamp (not an indicator anymore) becomes part of the physical row. E.g. long.cast(STRING) would require a materialization
          • Forwarding of time indicators does not materialize the rowtime. It remains a logical attribute. E.g. .select('long)
          • Windows are only valid if they work on time indicators.
          • A new `RowSchema` unifies logical and physical rows and tries to make type handling throughout the classes easier and consistent (it is not used everywhere yet).

          Missing so far:

          • Java API tests for defining time attributes
          • Tests for TableSources that define time attributes
          • Multiwindow tests
          • ProcessFunction for accessing the timestamp
          • More tests in general

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/twalthr/flink FLINK-5884

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3808.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3808


          commit 5550a6026614ef61a62e98654ee2cd2d45b6d64e
          Author: twalthr <twalthr@apache.org>
          Date: 2017-03-02T15:06:55Z

          FLINK-5884 [table] Integrate time indicators for Table API & SQL


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/3808 FLINK-5884 [table] Integrate time indicators for Table API & SQL This PR introduces time indicators in the Table & SQL API. The PR splits the current row type into a logical and physical row type. Time indicators are part of every logical row and stay logical if there are just forwarded and not accessed for calculation. If a time attribute is passed to a function or arithmetic expression it will be materialized. A ProcessFunction can access the meta timestamp for materialization (code generation for this has to be implemented). Every logical plan includes time attributes now and conistent: ``` val table = util.addTable [(Long, Int, String)] ('long.rowtime, 'int, 'string) val windowedTable = table .window(Session withGap 7.milli on 'long as 'w) .groupBy('w, 'string) .select('string, 'int.count) val expected = unaryNode( "DataStreamAggregate", streamTableNode(0), term("groupBy", "string"), term( "window", SessionGroupWindow( WindowReference("w"), 'long, 7.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) ``` Main changes of the current solution: Processing and rowtime time indicators can be named arbitrarily They can be defined as follows: stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime) or stream.toTable(tEnv, 'long.rowtime, 'int, 'string) In a streaming environment: if the "long" field is already defined in the record, it will not be read by the runtime. "long" always represents the timestamp of the row. In batch environment: "long" must be present in the record and will be read by the runtime. The table definition looks equivalent in both batch and streaming (better unification than current state) Internally row types are split up in a logical and a physical row type. The logical row type contains time indicators, the physical rowtime never contains time indicators (the pure "long" will never be in a record) After validation and query decorrelation, a special time indicator converter traverses the RelNodes and analyzes if the a time indicator is accessed or only forwarded. An access to a time indicator means that we need to materialize the rowtime using a ProcessFunction (not yet implemented). The timestamp (not an indicator anymore) becomes part of the physical row. E.g. long.cast(STRING) would require a materialization Forwarding of time indicators does not materialize the rowtime. It remains a logical attribute. E.g. .select('long) Windows are only valid if they work on time indicators. A new `RowSchema` unifies logical and physical rows and tries to make type handling throughout the classes easier and consistent (it is not used everywhere yet). Missing so far: Java API tests for defining time attributes Tests for TableSources that define time attributes Multiwindow tests ProcessFunction for accessing the timestamp More tests in general You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-5884 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3808.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3808 commit 5550a6026614ef61a62e98654ee2cd2d45b6d64e Author: twalthr <twalthr@apache.org> Date: 2017-03-02T15:06:55Z FLINK-5884 [table] Integrate time indicators for Table API & SQL
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114694410

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala —
          @@ -62,31 +62,30 @@ class GroupWindowTest extends TableTestBase {
          //===============================================================================================

          @Test(expected = classOf[ValidationException])

          • def testProcessingTimeTumblingGroupWindowOverTime(): Unit = {
            + def testInvalidProcessingTimeDefinition(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - table - .window(Tumble over 50.milli as 'w) // require a time attribute - .groupBy('w, 'string) - .select('string, 'int.count) + // proctime is not allowed + util.addTable[(Long, Int, String)]('long.proctime, 'int, 'string) }

          @Test(expected = classOf[ValidationException])

          • def testProcessingTimeTumblingGroupWindowOverCount(): Unit = {
            + def testInvalidProcessingTimeDefinition2(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + // proctime is not allowed + util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime) + }
          • table
          • .window(Tumble over 2.rows as 'w) // require a time attribute
          • .groupBy('w, 'string)
          • .select('string, 'int.count)
            + @Test(expected = classOf[ValidationException])
            + def testInvalidEventTimeDefinition(): Unit = { + val util = batchTestUtil() + // definition must not extend schema + util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime) }

          @Test
          def testEventTimeTumblingGroupWindowOverCount(): Unit = {
          val util = batchTestUtil()

          • val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
            + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
              • End diff –

          Do we need do some check that ensure `row time` column has the correct data type.?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114694410 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala — @@ -62,31 +62,30 @@ class GroupWindowTest extends TableTestBase { //=============================================================================================== @Test(expected = classOf [ValidationException] ) def testProcessingTimeTumblingGroupWindowOverTime(): Unit = { + def testInvalidProcessingTimeDefinition(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - table - .window(Tumble over 50.milli as 'w) // require a time attribute - .groupBy('w, 'string) - .select('string, 'int.count) + // proctime is not allowed + util.addTable[(Long, Int, String)]('long.proctime, 'int, 'string) } @Test(expected = classOf [ValidationException] ) def testProcessingTimeTumblingGroupWindowOverCount(): Unit = { + def testInvalidProcessingTimeDefinition2(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + // proctime is not allowed + util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime) + } table .window(Tumble over 2.rows as 'w) // require a time attribute .groupBy('w, 'string) .select('string, 'int.count) + @Test(expected = classOf [ValidationException] ) + def testInvalidEventTimeDefinition(): Unit = { + val util = batchTestUtil() + // definition must not extend schema + util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime) } @Test def testEventTimeTumblingGroupWindowOverCount(): Unit = { val util = batchTestUtil() val table = util.addTable [(Long, Int, String)] ('long, 'int, 'string) + val table = util.addTable [(Long, Int, String)] ('long.rowtime, 'int, 'string) End diff – Do we need do some check that ensure `row time` column has the correct data type.?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114697114

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala —
          @@ -150,3 +133,30 @@ case class WindowReference(name: String) extends Attribute {
          }
          }
          }
          +
          +abstract class TimeAttribute(val expression: Expression)
          + extends UnaryExpression
          + with NamedExpression {
          +
          + override private[flink] def child: Expression = expression
          +
          + override private[flink] def name: String = expression match

          { + case UnresolvedFieldReference(name) => name + case _ => throw new ValidationException("Unresolved field reference expected.") + }

          +
          + override private[flink] def toAttribute: Attribute =
          + throw new UnsupportedOperationException("Time attribute can not be used solely.")
          +}
          +
          +case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) {
          — End diff –

          I think the `expr` of `RowtimeAttribute` must be defined on an existing field in a batch environment, And in a Stream environment should not be a physical field. What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114697114 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala — @@ -150,3 +133,30 @@ case class WindowReference(name: String) extends Attribute { } } } + +abstract class TimeAttribute(val expression: Expression) + extends UnaryExpression + with NamedExpression { + + override private [flink] def child: Expression = expression + + override private [flink] def name: String = expression match { + case UnresolvedFieldReference(name) => name + case _ => throw new ValidationException("Unresolved field reference expected.") + } + + override private [flink] def toAttribute: Attribute = + throw new UnsupportedOperationException("Time attribute can not be used solely.") +} + +case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) { — End diff – I think the `expr` of `RowtimeAttribute` must be defined on an existing field in a batch environment, And in a Stream environment should not be a physical field. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114697132

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala —
          @@ -150,3 +133,30 @@ case class WindowReference(name: String) extends Attribute {
          }
          }
          }
          +
          +abstract class TimeAttribute(val expression: Expression)
          + extends UnaryExpression
          + with NamedExpression {
          +
          + override private[flink] def child: Expression = expression
          +
          + override private[flink] def name: String = expression match

          { + case UnresolvedFieldReference(name) => name + case _ => throw new ValidationException("Unresolved field reference expected.") + }

          +
          + override private[flink] def toAttribute: Attribute =
          + throw new UnsupportedOperationException("Time attribute can not be used solely.")
          +}
          +
          +case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr)

          { + + override private[flink] def resultType: TypeInformation[_] = + TimeIndicatorTypeInfo.ROWTIME_INDICATOR +}

          +
          +case class ProctimeAttribute(expr: Expression) extends TimeAttribute(expr) {
          — End diff –

          I think the `expr` of `ProctimeAttribute` should not be a physical field, Because we never use the value of physical field. e.g: we can not using `ProctimeAttribute` as follow:
          `//data (1L, 1, "Hello"),
          env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c.proctime)`
          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114697132 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala — @@ -150,3 +133,30 @@ case class WindowReference(name: String) extends Attribute { } } } + +abstract class TimeAttribute(val expression: Expression) + extends UnaryExpression + with NamedExpression { + + override private [flink] def child: Expression = expression + + override private [flink] def name: String = expression match { + case UnresolvedFieldReference(name) => name + case _ => throw new ValidationException("Unresolved field reference expected.") + } + + override private [flink] def toAttribute: Attribute = + throw new UnsupportedOperationException("Time attribute can not be used solely.") +} + +case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) { + + override private[flink] def resultType: TypeInformation[_] = + TimeIndicatorTypeInfo.ROWTIME_INDICATOR +} + +case class ProctimeAttribute(expr: Expression) extends TimeAttribute(expr) { — End diff – I think the `expr` of `ProctimeAttribute` should not be a physical field, Because we never use the value of physical field. e.g: we can not using `ProctimeAttribute` as follow: `//data (1L, 1, "Hello"), env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c.proctime)` What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114694072

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala —
          @@ -62,31 +62,30 @@ class GroupWindowTest extends TableTestBase {
          //===============================================================================================

          @Test(expected = classOf[ValidationException])

          • def testProcessingTimeTumblingGroupWindowOverTime(): Unit = {
            + def testInvalidProcessingTimeDefinition(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - table - .window(Tumble over 50.milli as 'w) // require a time attribute - .groupBy('w, 'string) - .select('string, 'int.count) + // proctime is not allowed + util.addTable[(Long, Int, String)]('long.proctime, 'int, 'string) }

          @Test(expected = classOf[ValidationException])

          • def testProcessingTimeTumblingGroupWindowOverCount(): Unit = {
            + def testInvalidProcessingTimeDefinition2(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + // proctime is not allowed + util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime) + }
          • table
          • .window(Tumble over 2.rows as 'w) // require a time attribute
          • .groupBy('w, 'string)
          • .select('string, 'int.count)
            + @Test(expected = classOf[ValidationException])
            + def testInvalidEventTimeDefinition(): Unit = { + val util = batchTestUtil() + // definition must not extend schema + util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime) }

          @Test
          def testEventTimeTumblingGroupWindowOverCount(): Unit = {
          val util = batchTestUtil()

          • val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
            + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)

          val windowedTable = table
          .window(Tumble over 2.rows on 'long as 'w)
          — End diff –

          Currently, `'long.rowtime` did not play a role. We can remove the `.rowtime` or move `.rowtime` to other column, It's works well. I think we need ensure that the `'.rowtime ` must be appended on `'long`. Because we invoked the `on` method with `'long`. What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114694072 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala — @@ -62,31 +62,30 @@ class GroupWindowTest extends TableTestBase { //=============================================================================================== @Test(expected = classOf [ValidationException] ) def testProcessingTimeTumblingGroupWindowOverTime(): Unit = { + def testInvalidProcessingTimeDefinition(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - table - .window(Tumble over 50.milli as 'w) // require a time attribute - .groupBy('w, 'string) - .select('string, 'int.count) + // proctime is not allowed + util.addTable[(Long, Int, String)]('long.proctime, 'int, 'string) } @Test(expected = classOf [ValidationException] ) def testProcessingTimeTumblingGroupWindowOverCount(): Unit = { + def testInvalidProcessingTimeDefinition2(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + // proctime is not allowed + util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime) + } table .window(Tumble over 2.rows as 'w) // require a time attribute .groupBy('w, 'string) .select('string, 'int.count) + @Test(expected = classOf [ValidationException] ) + def testInvalidEventTimeDefinition(): Unit = { + val util = batchTestUtil() + // definition must not extend schema + util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime) } @Test def testEventTimeTumblingGroupWindowOverCount(): Unit = { val util = batchTestUtil() val table = util.addTable [(Long, Int, String)] ('long, 'int, 'string) + val table = util.addTable [(Long, Int, String)] ('long.rowtime, 'int, 'string) val windowedTable = table .window(Tumble over 2.rows on 'long as 'w) — End diff – Currently, `'long.rowtime` did not play a role. We can remove the `.rowtime` or move `.rowtime` to other column, It's works well. I think we need ensure that the `'.rowtime ` must be appended on `'long`. Because we invoked the `on` method with `'long`. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114697863

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala —
          @@ -62,10 +62,10 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
          val stream = env
          .fromCollection(data)
          .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())

          • val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
            + val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
              • End diff –

          I think we can using `.rowtime` in this way as follwow:
          `stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string, 'rowtime.rowtime)`
          Because we do not need the value of `'long` field. It's just a indicators in stream.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114697863 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala — @@ -62,10 +62,10 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { val stream = env .fromCollection(data) .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) + val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string) End diff – I think we can using `.rowtime` in this way as follwow: `stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string, 'rowtime.rowtime)` Because we do not need the value of `'long` field. It's just a indicators in stream.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114783969

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -782,6 +842,10 @@ abstract class TableEnvironment(val config: TableConfig) {
          */
          object TableEnvironment {

          + // default names that can be used in in TableSources etc.
          + val DEFAULT_ROWTIME_ATTRIBUTE = "rowtime"
          — End diff –

          I think these are not used. Can they be removed?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114783969 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -782,6 +842,10 @@ abstract class TableEnvironment(val config: TableConfig) { */ object TableEnvironment { + // default names that can be used in in TableSources etc. + val DEFAULT_ROWTIME_ATTRIBUTE = "rowtime" — End diff – I think these are not used. Can they be removed?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114761137

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala —
          @@ -194,7 +194,30 @@ abstract class BatchTableEnvironment(
          protected def registerDataSetInternal[T](
          name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = {

          • val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType, fields)
            + val (fieldNames, fieldIndexes) = getFieldInfo[T](
            + dataSet.getType,
            + fields,
            + ignoreTimeAttributes = true)
            +
            + // validate and extract time attributes
            + val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields)
            +
            + // don't allow proctime on batch
            + proctime match {
              • End diff –

          If I understood it right, `rowtime` is allowed on batch tables but not interpreted (it is not passed on).
          I would not allow `rowtime` for batch tables. I don't think there is a reason for time indicators in batch mode.
          Any Long / Timestamp attribute can be used for windows, etc.

          Regarding compatibility of batch and streaming, I would argue that the table definition is not part of the query and does not need to be unified, IMO.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114761137 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala — @@ -194,7 +194,30 @@ abstract class BatchTableEnvironment( protected def registerDataSetInternal [T] ( name: String, dataSet: DataSet [T] , fields: Array [Expression] ): Unit = { val (fieldNames, fieldIndexes) = getFieldInfo [T] (dataSet.getType, fields) + val (fieldNames, fieldIndexes) = getFieldInfo [T] ( + dataSet.getType, + fields, + ignoreTimeAttributes = true) + + // validate and extract time attributes + val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields) + + // don't allow proctime on batch + proctime match { End diff – If I understood it right, `rowtime` is allowed on batch tables but not interpreted (it is not passed on). I would not allow `rowtime` for batch tables. I don't think there is a reason for time indicators in batch mode. Any Long / Timestamp attribute can be used for windows, etc. Regarding compatibility of batch and streaming, I would argue that the table definition is not part of the query and does not need to be unified, IMO.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114809690

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala —
          @@ -150,3 +133,30 @@ case class WindowReference(name: String) extends Attribute {
          }
          }
          }
          +
          +abstract class TimeAttribute(val expression: Expression)
          + extends UnaryExpression
          + with NamedExpression {
          +
          + override private[flink] def child: Expression = expression
          +
          + override private[flink] def name: String = expression match

          { + case UnresolvedFieldReference(name) => name + case _ => throw new ValidationException("Unresolved field reference expected.") + }

          +
          + override private[flink] def toAttribute: Attribute =
          + throw new UnsupportedOperationException("Time attribute can not be used solely.")
          +}
          +
          +case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr)

          { + + override private[flink] def resultType: TypeInformation[_] = + TimeIndicatorTypeInfo.ROWTIME_INDICATOR +}

          +
          +case class ProctimeAttribute(expr: Expression) extends TimeAttribute(expr) {
          — End diff –

          I agree with you, `'c` in your example should not be an existing field, i.e., `data` should be a type with only 2 fields. We need the `expr` field in `ProctimeAttribute` to remember the name of the logical field that holds the processing time.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114809690 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala — @@ -150,3 +133,30 @@ case class WindowReference(name: String) extends Attribute { } } } + +abstract class TimeAttribute(val expression: Expression) + extends UnaryExpression + with NamedExpression { + + override private [flink] def child: Expression = expression + + override private [flink] def name: String = expression match { + case UnresolvedFieldReference(name) => name + case _ => throw new ValidationException("Unresolved field reference expected.") + } + + override private [flink] def toAttribute: Attribute = + throw new UnsupportedOperationException("Time attribute can not be used solely.") +} + +case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) { + + override private[flink] def resultType: TypeInformation[_] = + TimeIndicatorTypeInfo.ROWTIME_INDICATOR +} + +case class ProctimeAttribute(expr: Expression) extends TimeAttribute(expr) { — End diff – I agree with you, `'c` in your example should not be an existing field, i.e., `data` should be a type with only 2 fields. We need the `expr` field in `ProctimeAttribute` to remember the name of the logical field that holds the processing time.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114781827

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -577,70 +577,94 @@ abstract class TableEnvironment(val config: TableConfig) {

          /**

          • Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
          • * [[Expression]].
            + * [[Expression]]. It does not handle time attributes but considers them in indices, if
            + * ignore flag is not false.
            *
          • @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
          • @param exprs The expressions that define the field names.
            + * @param ignoreTimeAttributes ignore time attributes and handle them as regular expressions.
          • @tparam A The type of the TypeInformation.
          • @return A tuple of two arrays holding the field names and corresponding field positions.
            */
            protected[flink] def getFieldInfo[A](
          • inputType: TypeInformation[A],
          • exprs: Array[Expression]): (Array[String], Array[Int]) = {
            + inputType: TypeInformation[A],
            + exprs: Array[Expression],
            + ignoreTimeAttributes: Boolean)
            + : (Array[String], Array[Int]) = {

          TableEnvironment.validateType(inputType)

          + val filteredExprs = if (ignoreTimeAttributes) {
          + exprs.map

          { + case ta: TimeAttribute => ta.expression + case e@_ => e + }

          + } else

          { + exprs + }

          +
          val indexedNames: Array[(Int, String)] = inputType match {
          case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
          throw new TableException(
          "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
          "Please specify the type of the input with a RowTypeInfo.")
          case a: AtomicType[A] =>

          • if (exprs.length != 1) { - throw new TableException("Table of atomic type can only have a single field.") - }
          • exprs.map {
          • case UnresolvedFieldReference(name) => (0, name)
            + filteredExprs.zipWithIndex flatMap {
            + case (UnresolvedFieldReference(name), idx) =>
            + if (idx > 0) {
              • End diff –

          Is this correct? Couldn't we have a `DataStream[String]` which gets mapped to a `Table` with schema `'t.rowtime, 'name`? In this case the `idx` of `'name` would be `1`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114781827 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -577,70 +577,94 @@ abstract class TableEnvironment(val config: TableConfig) { /** Returns field names and field positions for a given [ [TypeInformation] ] and [ [Array] ] of * [ [Expression] ]. + * [ [Expression] ]. It does not handle time attributes but considers them in indices, if + * ignore flag is not false. * @param inputType The [ [TypeInformation] ] against which the [ [Expression] ]s are evaluated. @param exprs The expressions that define the field names. + * @param ignoreTimeAttributes ignore time attributes and handle them as regular expressions. @tparam A The type of the TypeInformation. @return A tuple of two arrays holding the field names and corresponding field positions. */ protected [flink] def getFieldInfo [A] ( inputType: TypeInformation [A] , exprs: Array [Expression] ): (Array [String] , Array [Int] ) = { + inputType: TypeInformation [A] , + exprs: Array [Expression] , + ignoreTimeAttributes: Boolean) + : (Array [String] , Array [Int] ) = { TableEnvironment.validateType(inputType) + val filteredExprs = if (ignoreTimeAttributes) { + exprs.map { + case ta: TimeAttribute => ta.expression + case e@_ => e + } + } else { + exprs + } + val indexedNames: Array [(Int, String)] = inputType match { case g: GenericTypeInfo [A] if g.getTypeClass == classOf [Row] => throw new TableException( "An input of GenericTypeInfo<Row> cannot be converted to Table. " + "Please specify the type of the input with a RowTypeInfo.") case a: AtomicType [A] => if (exprs.length != 1) { - throw new TableException("Table of atomic type can only have a single field.") - } exprs.map { case UnresolvedFieldReference(name) => (0, name) + filteredExprs.zipWithIndex flatMap { + case (UnresolvedFieldReference(name), idx) => + if (idx > 0) { End diff – Is this correct? Couldn't we have a `DataStream [String] ` which gets mapped to a `Table` with schema `'t.rowtime, 'name`? In this case the `idx` of `'name` would be `1`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114896129

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala —
          @@ -44,36 +44,34 @@ trait CommonCorrelate {
          */
          private[flink] def correlateMapFunction(
          config: TableConfig,

          • inputTypeInfo: TypeInformation[Row],
            + inputSchema: RowSchema,
            udtfTypeInfo: TypeInformation[Any],
          • rowType: RelDataType,
            + returnSchema: RowSchema,
            joinType: SemiJoinType,
            rexCall: RexCall,
            condition: Option[RexNode],
            pojoFieldMapping: Option[Array[Int]], // udtf return type pojo field mapping
            ruleDescription: String)
            : CorrelateFlatMapRunner[Row, Row] = {
          • val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
            -
            val flatMap = generateFunction(
            config,
          • inputTypeInfo,
            + inputSchema.physicalTypeInfo,
            udtfTypeInfo,
          • returnType,
          • rowType,
            + returnSchema.physicalTypeInfo,
            + returnSchema.logicalFieldNames,
            joinType,
          • rexCall,
            + inputSchema.mapRexNode(rexCall).asInstanceOf[RexCall],
            pojoFieldMapping,
            ruleDescription)

          val collector = generateCollector(
          config,

          • inputTypeInfo,
            + inputSchema.physicalTypeInfo,
            udtfTypeInfo,
          • returnType,
          • rowType,
          • condition,
            + returnSchema.physicalTypeInfo,
            + returnSchema.logicalFieldNames,
            + condition.map(inputSchema.mapRexNode),
              • End diff –

          Do we need to check that `condition` does not contain a time indicator?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114896129 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala — @@ -44,36 +44,34 @@ trait CommonCorrelate { */ private [flink] def correlateMapFunction( config: TableConfig, inputTypeInfo: TypeInformation [Row] , + inputSchema: RowSchema, udtfTypeInfo: TypeInformation [Any] , rowType: RelDataType, + returnSchema: RowSchema, joinType: SemiJoinType, rexCall: RexCall, condition: Option [RexNode] , pojoFieldMapping: Option[Array [Int] ], // udtf return type pojo field mapping ruleDescription: String) : CorrelateFlatMapRunner [Row, Row] = { val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType) - val flatMap = generateFunction( config, inputTypeInfo, + inputSchema.physicalTypeInfo, udtfTypeInfo, returnType, rowType, + returnSchema.physicalTypeInfo, + returnSchema.logicalFieldNames, joinType, rexCall, + inputSchema.mapRexNode(rexCall).asInstanceOf [RexCall] , pojoFieldMapping, ruleDescription) val collector = generateCollector( config, inputTypeInfo, + inputSchema.physicalTypeInfo, udtfTypeInfo, returnType, rowType, condition, + returnSchema.physicalTypeInfo, + returnSchema.logicalFieldNames, + condition.map(inputSchema.mapRexNode), End diff – Do we need to check that `condition` does not contain a time indicator?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114783322

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -774,6 +798,42 @@ abstract class TableEnvironment(val config: TableConfig)

          { Some(mapFunction) }

          + /**
          + * Checks for at most one rowtime and proctime attribute.
          + * Returns the time attributes.
          + *
          + * @return rowtime attribute and proctime attribute
          + */
          + protected def validateAndExtractTimeAttributes(
          — End diff –

          Can be moved to `StreamTableEnvironment` if we do not allow `TimeAttribute`s in batch environments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114783322 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -774,6 +798,42 @@ abstract class TableEnvironment(val config: TableConfig) { Some(mapFunction) } + /** + * Checks for at most one rowtime and proctime attribute. + * Returns the time attributes. + * + * @return rowtime attribute and proctime attribute + */ + protected def validateAndExtractTimeAttributes( — End diff – Can be moved to `StreamTableEnvironment` if we do not allow `TimeAttribute`s in batch environments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114820697

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala —
          @@ -35,21 +34,30 @@ import scala.collection.JavaConverters._
          trait CommonCalc {

          private[flink] def functionBody(

          • generator: CodeGenerator,
          • inputType: TypeInformation[Row],
          • rowType: RelDataType,
          • calcProgram: RexProgram,
          • config: TableConfig)
            + generator: CodeGenerator,
              • End diff –

          Revert indention

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114820697 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala — @@ -35,21 +34,30 @@ import scala.collection.JavaConverters._ trait CommonCalc { private [flink] def functionBody( generator: CodeGenerator, inputType: TypeInformation [Row] , rowType: RelDataType, calcProgram: RexProgram, config: TableConfig) + generator: CodeGenerator, End diff – Revert indention
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114888441

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala —
          @@ -35,21 +34,30 @@ import scala.collection.JavaConverters._
          trait CommonCalc {

          private[flink] def functionBody(

          • generator: CodeGenerator,
          • inputType: TypeInformation[Row],
          • rowType: RelDataType,
          • calcProgram: RexProgram,
          • config: TableConfig)
            + generator: CodeGenerator,
            + inputSchema: RowSchema,
            + returnSchema: RowSchema,
            + calcProgram: RexProgram,
            + config: TableConfig)
            : String = {
          • val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
            + val expandedExpressions = calcProgram
            + .getProjectList
            + .map(expr => calcProgram.expandLocalRef(expr))
            + // time indicator fields must not be part of the code generation
            + .filter(expr => !FlinkTypeFactory.isTimeIndicatorType(expr.getType))
            + // update indices
            + .map(expr => inputSchema.mapRexNode(expr))
            +
            + val condition = if (calcProgram.getCondition != null) {
            + inputSchema.mapRexNode(calcProgram.expandLocalRef(calcProgram.getCondition))
              • End diff –

          Should we validate that a filter condition does not reference a time indicator?
          This
          ```
          .toTable(tEnv, 'a, 'b, 'c, 'd.rowtime)
          .filter('d === 'd + 4.millis)
          .select('a, 'b, 'c)
          ```
          yields
          ```
          org.apache.flink.table.api.TableException: Invalid access to a logical field.
          at org.apache.flink.table.plan.schema.RowSchema.mapIndex(RowSchema.scala:127)
          ```

          A better error message should explain that time indicators cannot be used in conditions yet.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114888441 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala — @@ -35,21 +34,30 @@ import scala.collection.JavaConverters._ trait CommonCalc { private [flink] def functionBody( generator: CodeGenerator, inputType: TypeInformation [Row] , rowType: RelDataType, calcProgram: RexProgram, config: TableConfig) + generator: CodeGenerator, + inputSchema: RowSchema, + returnSchema: RowSchema, + calcProgram: RexProgram, + config: TableConfig) : String = { val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType) + val expandedExpressions = calcProgram + .getProjectList + .map(expr => calcProgram.expandLocalRef(expr)) + // time indicator fields must not be part of the code generation + .filter(expr => !FlinkTypeFactory.isTimeIndicatorType(expr.getType)) + // update indices + .map(expr => inputSchema.mapRexNode(expr)) + + val condition = if (calcProgram.getCondition != null) { + inputSchema.mapRexNode(calcProgram.expandLocalRef(calcProgram.getCondition)) End diff – Should we validate that a filter condition does not reference a time indicator? This ``` .toTable(tEnv, 'a, 'b, 'c, 'd.rowtime) .filter('d === 'd + 4.millis) .select('a, 'b, 'c) ``` yields ``` org.apache.flink.table.api.TableException: Invalid access to a logical field. at org.apache.flink.table.plan.schema.RowSchema.mapIndex(RowSchema.scala:127) ``` A better error message should explain that time indicators cannot be used in conditions yet.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114907167

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -60,26 +61,31 @@ object AggregateUtil {

          • window to evaluate final aggregate value.
            *
          • @param generator code generator instance
          • * @param namedAggregates List of calls to aggregate functions and their output field names
          • * @param inputType Input row type
            + * @param namedAggregates Physical calls to aggregate functions and their output field names
            + * @param inputType Physical type of the row.
            + * @param inputTypeInfo Physical type information of the row.
            + * @param inputFieldTypeInfo Physical type information of the row's fields.
          • @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType
          • @param isPartitioned It is a tag that indicate whether the input is partitioned
          • @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause
            */
            private[flink] def createUnboundedOverProcessFunction(
          • generator: CodeGenerator,
          • namedAggregates: Seq[CalcitePair[AggregateCall, String]],
          • inputType: RelDataType,
          • isRowTimeType: Boolean,
          • isPartitioned: Boolean,
          • isRowsClause: Boolean): ProcessFunction[Row, Row] = {
            + generator: CodeGenerator,
            + namedAggregates: Seq[CalcitePair[AggregateCall, String]],
            + inputType: RelDataType,
            + inputTypeInfo: TypeInformation[Row],
            + inputFieldTypeInfo: Seq[TypeInformation[_]],
            + isRowTimeType: Boolean,
            + isPartitioned: Boolean,
            + isRowsClause: Boolean)
            + : ProcessFunction[Row, Row] = {

          val needRetract = false
          — End diff –

          remove or revert the next change

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114907167 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -60,26 +61,31 @@ object AggregateUtil { window to evaluate final aggregate value. * @param generator code generator instance * @param namedAggregates List of calls to aggregate functions and their output field names * @param inputType Input row type + * @param namedAggregates Physical calls to aggregate functions and their output field names + * @param inputType Physical type of the row. + * @param inputTypeInfo Physical type information of the row. + * @param inputFieldTypeInfo Physical type information of the row's fields. @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType @param isPartitioned It is a tag that indicate whether the input is partitioned @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause */ private [flink] def createUnboundedOverProcessFunction( generator: CodeGenerator, namedAggregates: Seq[CalcitePair [AggregateCall, String] ], inputType: RelDataType, isRowTimeType: Boolean, isPartitioned: Boolean, isRowsClause: Boolean): ProcessFunction [Row, Row] = { + generator: CodeGenerator, + namedAggregates: Seq[CalcitePair [AggregateCall, String] ], + inputType: RelDataType, + inputTypeInfo: TypeInformation [Row] , + inputFieldTypeInfo: Seq[TypeInformation [_] ], + isRowTimeType: Boolean, + isPartitioned: Boolean, + isRowsClause: Boolean) + : ProcessFunction [Row, Row] = { val needRetract = false — End diff – remove or revert the next change
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114793482

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala —
          @@ -178,6 +236,7 @@ object FlinkTypeFactory {
          /**

          • Converts a Calcite logical record into a Flink type information.
            */
            + @deprecated("Use the RowSchema class instead because it handles both logical and physical rows.")
              • End diff –

          Create a follow up issue to clean this up?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114793482 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala — @@ -178,6 +236,7 @@ object FlinkTypeFactory { /** Converts a Calcite logical record into a Flink type information. */ + @deprecated("Use the RowSchema class instead because it handles both logical and physical rows.") End diff – Create a follow up issue to clean this up?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114791208

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala —
          @@ -77,23 +83,75 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
          }

          /**
          + * Creates a indicator type for processing-time, but with similar properties as SQL timestamp.
          + */
          + def createProctimeIndicatorType(): RelDataType =

          { + val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP) + canonize( + new TimeIndicatorRelDataType( + getTypeSystem, + originalType.asInstanceOf[BasicSqlType], + isEventTime = false) + ) + }

          +
          + /**
          + * Creates a indicator type for event-time, but with similar properties as SQL timestamp.
          + */
          + def createRowtimeIndicatorType(): RelDataType =

          { + val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP) + canonize( + new TimeIndicatorRelDataType( + getTypeSystem, + originalType.asInstanceOf[BasicSqlType], + isEventTime = true) + ) + }

          +
          + /**

          • Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory
            *
          • @param fieldNames field names
          • @param fieldTypes field types, every element is Flink's [[TypeInformation]]
          • * @return a struct type with the input fieldNames and input fieldTypes
            + * @param rowtime optional system field to indicate event-time; the index determines the index
            + * in the final record and might replace an existing field
              • End diff –

          existing fields are shifted not replaced, correct?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114791208 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala — @@ -77,23 +83,75 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp } /** + * Creates a indicator type for processing-time, but with similar properties as SQL timestamp. + */ + def createProctimeIndicatorType(): RelDataType = { + val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP) + canonize( + new TimeIndicatorRelDataType( + getTypeSystem, + originalType.asInstanceOf[BasicSqlType], + isEventTime = false) + ) + } + + /** + * Creates a indicator type for event-time, but with similar properties as SQL timestamp. + */ + def createRowtimeIndicatorType(): RelDataType = { + val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP) + canonize( + new TimeIndicatorRelDataType( + getTypeSystem, + originalType.asInstanceOf[BasicSqlType], + isEventTime = true) + ) + } + + /** Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory * @param fieldNames field names @param fieldTypes field types, every element is Flink's [ [TypeInformation] ] * @return a struct type with the input fieldNames and input fieldTypes + * @param rowtime optional system field to indicate event-time; the index determines the index + * in the final record and might replace an existing field End diff – existing fields are shifted not replaced, correct?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114814140

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala —
          @@ -18,259 +18,165 @@

          package org.apache.flink.table.plan.logical

          -import org.apache.flink.api.common.typeinfo.BasicTypeInfo
          import org.apache.flink.table.api.

          {BatchTableEnvironment, StreamTableEnvironment, TableEnvironment}

          +import org.apache.flink.table.expressions.ExpressionUtils.

          {isRowCountLiteral, isRowtimeAttribute, isTimeAttribute, isTimeIntervalLiteral}

          import org.apache.flink.table.expressions._
          -import org.apache.flink.table.typeutils.

          {RowIntervalTypeInfo, TimeIntervalTypeInfo, TypeCoercion}

          +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimePoint
          import org.apache.flink.table.validate.

          {ValidationFailure, ValidationResult, ValidationSuccess}

          -abstract class EventTimeGroupWindow(

          • alias: Expression,
          • time: Expression)
          • extends LogicalWindow(alias) {
            -
          • override def validate(tableEnv: TableEnvironment): ValidationResult = {
          • val valid = super.validate(tableEnv)
          • if (valid.isFailure) { - return valid - }

            -

          • tableEnv match {
          • case _: StreamTableEnvironment =>
          • time match { - case RowtimeAttribute() => - ValidationSuccess - case _ => - ValidationFailure("Event-time window expects a 'rowtime' time field.") - }
          • case _: BatchTableEnvironment =>
          • if (!TypeCoercion.canCast(time.resultType, BasicTypeInfo.LONG_TYPE_INFO)) {
          • ValidationFailure(s"Event-time window expects a time field that can be safely cast " +
          • s"to Long, but is $ {time.resultType}

            ")

          • } else { - ValidationSuccess - }
          • }
            -
          • }
            -}
            -
            -abstract class ProcessingTimeGroupWindow(alias: Expression) extends LogicalWindow(alias) {
          • override def validate(tableEnv: TableEnvironment): ValidationResult = {
          • val valid = super.validate(tableEnv)
          • if (valid.isFailure) { - return valid - }

            -

          • tableEnv match { - case b: BatchTableEnvironment => ValidationFailure( - "Window on batch must declare a time attribute over which the query is evaluated.") - case _ => - ValidationSuccess - }
          • }
            -}
            -
            // ------------------------------------------------------------------------------------------------
            // Tumbling group windows
            // ------------------------------------------------------------------------------------------------

          -object TumblingGroupWindow {

          • def validate(tableEnv: TableEnvironment, size: Expression): ValidationResult = size match { - case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) => - ValidationSuccess - case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) => - ValidationSuccess - case _ => - ValidationFailure("Tumbling window expects size literal of type Interval of Milliseconds " + - "or Interval of Rows.") - }

            -}
            -
            -case class ProcessingTimeTumblingGroupWindow(

          • override val alias: Expression,
          • size: Expression)
          • extends ProcessingTimeGroupWindow(alias) { - - override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = - ProcessingTimeTumblingGroupWindow( - resolve(alias), - resolve(size)) - - override def validate(tableEnv: TableEnvironment): ValidationResult = - super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv, size)) - - override def toString: String = s"ProcessingTimeTumblingGroupWindow($alias, $size)" -}

            -
            -case class EventTimeTumblingGroupWindow(

          • override val alias: Expression,
            +case class TumblingGroupWindow(
            + alias: Expression,
            timeField: Expression,
            size: Expression)
          • extends EventTimeGroupWindow(
            + extends LogicalWindow(
            alias,
            timeField) {

          override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =

          • EventTimeTumblingGroupWindow(
            + TumblingGroupWindow(
            resolve(alias),
            resolve(timeField),
            resolve(size))

          override def validate(tableEnv: TableEnvironment): ValidationResult =

          • super.validate(tableEnv)
          • .orElse(TumblingGroupWindow.validate(tableEnv, size))
          • .orElse(size match {
          • case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS)
          • if tableEnv.isInstanceOf[StreamTableEnvironment] =>
            + super.validate(tableEnv).orElse(
            + tableEnv match {
            +
            + // check size
            + case _ if !isTimeIntervalLiteral(size) && !isRowCountLiteral(size) =>
            + ValidationFailure(
            + "Tumbling window expects size literal of type Interval of Milliseconds " +
            + "or Interval of Rows.")
            +
            + // check time attribute
            + case _: StreamTableEnvironment if !isTimeAttribute(timeField) =>
            + ValidationFailure(
            + "Tumbling window expects a time attribute for grouping in a stream environment.")
            + case _: BatchTableEnvironment if isTimePoint(size.resultType) =>
              • End diff –

          Should this be something like `!(isTimePoint(timeField.resultType) || isLong(timeField.resultType))`?

          same for the other windows?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114814140 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala — @@ -18,259 +18,165 @@ package org.apache.flink.table.plan.logical -import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.table.api. {BatchTableEnvironment, StreamTableEnvironment, TableEnvironment} +import org.apache.flink.table.expressions.ExpressionUtils. {isRowCountLiteral, isRowtimeAttribute, isTimeAttribute, isTimeIntervalLiteral} import org.apache.flink.table.expressions._ -import org.apache.flink.table.typeutils. {RowIntervalTypeInfo, TimeIntervalTypeInfo, TypeCoercion} +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimePoint import org.apache.flink.table.validate. {ValidationFailure, ValidationResult, ValidationSuccess} -abstract class EventTimeGroupWindow( alias: Expression, time: Expression) extends LogicalWindow(alias) { - override def validate(tableEnv: TableEnvironment): ValidationResult = { val valid = super.validate(tableEnv) if (valid.isFailure) { - return valid - } - tableEnv match { case _: StreamTableEnvironment => time match { - case RowtimeAttribute() => - ValidationSuccess - case _ => - ValidationFailure("Event-time window expects a 'rowtime' time field.") - } case _: BatchTableEnvironment => if (!TypeCoercion.canCast(time.resultType, BasicTypeInfo.LONG_TYPE_INFO)) { ValidationFailure(s"Event-time window expects a time field that can be safely cast " + s"to Long, but is $ {time.resultType} ") } else { - ValidationSuccess - } } - } -} - -abstract class ProcessingTimeGroupWindow(alias: Expression) extends LogicalWindow(alias) { override def validate(tableEnv: TableEnvironment): ValidationResult = { val valid = super.validate(tableEnv) if (valid.isFailure) { - return valid - } - tableEnv match { - case b: BatchTableEnvironment => ValidationFailure( - "Window on batch must declare a time attribute over which the query is evaluated.") - case _ => - ValidationSuccess - } } -} - // ------------------------------------------------------------------------------------------------ // Tumbling group windows // ------------------------------------------------------------------------------------------------ -object TumblingGroupWindow { def validate(tableEnv: TableEnvironment, size: Expression): ValidationResult = size match { - case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) => - ValidationSuccess - case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) => - ValidationSuccess - case _ => - ValidationFailure("Tumbling window expects size literal of type Interval of Milliseconds " + - "or Interval of Rows.") - } -} - -case class ProcessingTimeTumblingGroupWindow( override val alias: Expression, size: Expression) extends ProcessingTimeGroupWindow(alias) { - - override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = - ProcessingTimeTumblingGroupWindow( - resolve(alias), - resolve(size)) - - override def validate(tableEnv: TableEnvironment): ValidationResult = - super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv, size)) - - override def toString: String = s"ProcessingTimeTumblingGroupWindow($alias, $size)" -} - -case class EventTimeTumblingGroupWindow( override val alias: Expression, +case class TumblingGroupWindow( + alias: Expression, timeField: Expression, size: Expression) extends EventTimeGroupWindow( + extends LogicalWindow( alias, timeField) { override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = EventTimeTumblingGroupWindow( + TumblingGroupWindow( resolve(alias), resolve(timeField), resolve(size)) override def validate(tableEnv: TableEnvironment): ValidationResult = super.validate(tableEnv) .orElse(TumblingGroupWindow.validate(tableEnv, size)) .orElse(size match { case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) if tableEnv.isInstanceOf [StreamTableEnvironment] => + super.validate(tableEnv).orElse( + tableEnv match { + + // check size + case _ if !isTimeIntervalLiteral(size) && !isRowCountLiteral(size) => + ValidationFailure( + "Tumbling window expects size literal of type Interval of Milliseconds " + + "or Interval of Rows.") + + // check time attribute + case _: StreamTableEnvironment if !isTimeAttribute(timeField) => + ValidationFailure( + "Tumbling window expects a time attribute for grouping in a stream environment.") + case _: BatchTableEnvironment if isTimePoint(size.resultType) => End diff – Should this be something like `!(isTimePoint(timeField.resultType) || isLong(timeField.resultType))`? same for the other windows?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114811107

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala —
          @@ -22,14 +22,24 @@ import org.apache.flink.table.api.TableEnvironment
          import org.apache.flink.table.expressions.

          {Expression, WindowReference}

          import org.apache.flink.table.validate.

          {ValidationFailure, ValidationResult, ValidationSuccess}

          -abstract class LogicalWindow(val alias: Expression) extends Resolvable[LogicalWindow] {
          +/**
          + * Logical super class for all types of windows (group-windows and row-windows).
          — End diff –

          So far it is only used for group windows.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114811107 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala — @@ -22,14 +22,24 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.expressions. {Expression, WindowReference} import org.apache.flink.table.validate. {ValidationFailure, ValidationResult, ValidationSuccess} -abstract class LogicalWindow(val alias: Expression) extends Resolvable [LogicalWindow] { +/** + * Logical super class for all types of windows (group-windows and row-windows). — End diff – So far it is only used for group windows.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114778319

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -577,70 +577,94 @@ abstract class TableEnvironment(val config: TableConfig) {

          /**

          • Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
          • * [[Expression]].
            + * [[Expression]]. It does not handle time attributes but considers them in indices, if
            + * ignore flag is not false.
            *
          • @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
          • @param exprs The expressions that define the field names.
            + * @param ignoreTimeAttributes ignore time attributes and handle them as regular expressions.
          • @tparam A The type of the TypeInformation.
          • @return A tuple of two arrays holding the field names and corresponding field positions.
            */
            protected[flink] def getFieldInfo[A](
          • inputType: TypeInformation[A],
          • exprs: Array[Expression]): (Array[String], Array[Int]) = {
            + inputType: TypeInformation[A],
            + exprs: Array[Expression],
            + ignoreTimeAttributes: Boolean)
              • End diff –

          Can we remove this attribute and expect the caller of this method to unwrap expressions from the time attribute?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114778319 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -577,70 +577,94 @@ abstract class TableEnvironment(val config: TableConfig) { /** Returns field names and field positions for a given [ [TypeInformation] ] and [ [Array] ] of * [ [Expression] ]. + * [ [Expression] ]. It does not handle time attributes but considers them in indices, if + * ignore flag is not false. * @param inputType The [ [TypeInformation] ] against which the [ [Expression] ]s are evaluated. @param exprs The expressions that define the field names. + * @param ignoreTimeAttributes ignore time attributes and handle them as regular expressions. @tparam A The type of the TypeInformation. @return A tuple of two arrays holding the field names and corresponding field positions. */ protected [flink] def getFieldInfo [A] ( inputType: TypeInformation [A] , exprs: Array [Expression] ): (Array [String] , Array [Int] ) = { + inputType: TypeInformation [A] , + exprs: Array [Expression] , + ignoreTimeAttributes: Boolean) End diff – Can we remove this attribute and expect the caller of this method to unwrap expressions from the time attribute?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114892939

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala —
          @@ -145,6 +105,7 @@ abstract class StreamTableEnvironment(
          "StreamTableEnvironment")
          }
          }
          +
          /**
          — End diff –

          Converting a Table that projects a time indicator into a DataStream[Row] as follows
          ```
          stream.toTable(tEnv, 'a, 'b, 'c, 'd.rowtime)
          .select('a, 'b, 'c, 'd + 2.hours)
          val results = ds.toDataStream[Row]
          ```
          yields the following exception
          ```
          org.apache.flink.table.api.TableException: The field types of physical and logical row types do not match.This is a bug and should not happen. Please file an issue.

          at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
          at org.apache.flink.table.api.TableEnvironment.sinkConversion(TableEnvironment.scala:691)
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114892939 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala — @@ -145,6 +105,7 @@ abstract class StreamTableEnvironment( "StreamTableEnvironment") } } + /** — End diff – Converting a Table that projects a time indicator into a DataStream [Row] as follows ``` stream.toTable(tEnv, 'a, 'b, 'c, 'd.rowtime) .select('a, 'b, 'c, 'd + 2.hours) val results = ds.toDataStream [Row] ``` yields the following exception ``` org.apache.flink.table.api.TableException: The field types of physical and logical row types do not match.This is a bug and should not happen. Please file an issue. at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) at org.apache.flink.table.api.TableEnvironment.sinkConversion(TableEnvironment.scala:691) ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114891828

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala —
          @@ -35,21 +34,30 @@ import scala.collection.JavaConverters._
          trait CommonCalc {

          private[flink] def functionBody(

          • generator: CodeGenerator,
          • inputType: TypeInformation[Row],
          • rowType: RelDataType,
          • calcProgram: RexProgram,
          • config: TableConfig)
            + generator: CodeGenerator,
            + inputSchema: RowSchema,
            + returnSchema: RowSchema,
            + calcProgram: RexProgram,
            + config: TableConfig)
            : String = {
          • val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
            + val expandedExpressions = calcProgram
            + .getProjectList
            + .map(expr => calcProgram.expandLocalRef(expr))
            + // time indicator fields must not be part of the code generation
            + .filter(expr => !FlinkTypeFactory.isTimeIndicatorType(expr.getType))
            + // update indices
            + .map(expr => inputSchema.mapRexNode(expr))
            +
            + val condition = if (calcProgram.getCondition != null) {
            + inputSchema.mapRexNode(calcProgram.expandLocalRef(calcProgram.getCondition))
              • End diff –

          The same happens with a SQL query

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114891828 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala — @@ -35,21 +34,30 @@ import scala.collection.JavaConverters._ trait CommonCalc { private [flink] def functionBody( generator: CodeGenerator, inputType: TypeInformation [Row] , rowType: RelDataType, calcProgram: RexProgram, config: TableConfig) + generator: CodeGenerator, + inputSchema: RowSchema, + returnSchema: RowSchema, + calcProgram: RexProgram, + config: TableConfig) : String = { val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType) + val expandedExpressions = calcProgram + .getProjectList + .map(expr => calcProgram.expandLocalRef(expr)) + // time indicator fields must not be part of the code generation + .filter(expr => !FlinkTypeFactory.isTimeIndicatorType(expr.getType)) + // update indices + .map(expr => inputSchema.mapRexNode(expr)) + + val condition = if (calcProgram.getCondition != null) { + inputSchema.mapRexNode(calcProgram.expandLocalRef(calcProgram.getCondition)) End diff – The same happens with a SQL query
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114899203

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala —
          @@ -83,24 +86,28 @@ class DataSetCalc(

          val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)

          • val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
            -
            val generator = new CodeGenerator(config, false, inputDS.getType)

          + val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
          +
          val body = functionBody(
          generator,

          • inputDS.getType,
          • getRowType,
            + new RowSchema(getInput.getRowType),
            + new RowSchema(getRowType),
            calcProgram,
            config)

          val genFunction = generator.generateFunction(
          ruleDescription,
          classOf[FlatMapFunction[Row, Row]],
          body,

          • returnType)
            + rowTypeInfo)
            +
            + val runner = new FlatMapRunner[Row, Row](
              • End diff –

          why not keeping `calcMapFunction`? It should do the same.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114899203 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala — @@ -83,24 +86,28 @@ class DataSetCalc( val inputDS = getInput.asInstanceOf [DataSetRel] .translateToPlan(tableEnv) val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) - val generator = new CodeGenerator(config, false, inputDS.getType) + val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf [RowTypeInfo] + val body = functionBody( generator, inputDS.getType, getRowType, + new RowSchema(getInput.getRowType), + new RowSchema(getRowType), calcProgram, config) val genFunction = generator.generateFunction( ruleDescription, classOf[FlatMapFunction [Row, Row] ], body, returnType) + rowTypeInfo) + + val runner = new FlatMapRunner [Row, Row] ( End diff – why not keeping `calcMapFunction`? It should do the same.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114904879

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala —
          @@ -0,0 +1,47 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.sources
          +
          +import org.apache.flink.api.java.tuple.Tuple2
          +
          +/**
          + * Defines logical time attributes for a [[TableSource]]. Time attributes can be used for
          + * indicating, accessing, and working with Flink's event-time or processing-time. A
          + * [[TableSource]] that implements this interface can define names and positions of rowtime
          + * and proctime attributes in the rows it produces.
          + */
          +trait DefinedTimeAttributes {
          +
          + /**
          + * Defines a name and position (starting at 0) of rowtime attribute that represents Flink's
          + * event-time. Null if no rowtime should be available. If the position is within the arity of
          + * the result row, the logical attribute will overwrite the physical attribute. If the position
          + * is higher than the result row, the time attribute will be appended logically.
          + */
          + def getRowtimeAttribute: Tuple2[Int, String]
          — End diff –

          I don't think we need to offer the option to override the an attribute for TableSources. They can modify the schema themselves and return the data without the actual time attribute.
          So `getRowtimeAttribute` would just define the name of the logical attribute.

          Same for proctime.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114904879 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala — @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.api.java.tuple.Tuple2 + +/** + * Defines logical time attributes for a [ [TableSource] ]. Time attributes can be used for + * indicating, accessing, and working with Flink's event-time or processing-time. A + * [ [TableSource] ] that implements this interface can define names and positions of rowtime + * and proctime attributes in the rows it produces. + */ +trait DefinedTimeAttributes { + + /** + * Defines a name and position (starting at 0) of rowtime attribute that represents Flink's + * event-time. Null if no rowtime should be available. If the position is within the arity of + * the result row, the logical attribute will overwrite the physical attribute. If the position + * is higher than the result row, the time attribute will be appended logically. + */ + def getRowtimeAttribute: Tuple2 [Int, String] — End diff – I don't think we need to offer the option to override the an attribute for TableSources. They can modify the schema themselves and return the data without the actual time attribute. So `getRowtimeAttribute` would just define the name of the logical attribute. Same for proctime.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114910572

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala —
          @@ -62,31 +62,30 @@ class GroupWindowTest extends TableTestBase {
          //===============================================================================================

          @Test(expected = classOf[ValidationException])

          • def testProcessingTimeTumblingGroupWindowOverTime(): Unit = {
            + def testInvalidProcessingTimeDefinition(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - table - .window(Tumble over 50.milli as 'w) // require a time attribute - .groupBy('w, 'string) - .select('string, 'int.count) + // proctime is not allowed + util.addTable[(Long, Int, String)]('long.proctime, 'int, 'string) }

          @Test(expected = classOf[ValidationException])

          • def testProcessingTimeTumblingGroupWindowOverCount(): Unit = {
            + def testInvalidProcessingTimeDefinition2(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + // proctime is not allowed + util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime) + }
          • table
          • .window(Tumble over 2.rows as 'w) // require a time attribute
          • .groupBy('w, 'string)
          • .select('string, 'int.count)
            + @Test(expected = classOf[ValidationException])
            + def testInvalidEventTimeDefinition(): Unit = { + val util = batchTestUtil() + // definition must not extend schema + util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime) }

          @Test
          def testEventTimeTumblingGroupWindowOverCount(): Unit = {
          val util = batchTestUtil()

          • val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
            + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)

          val windowedTable = table
          .window(Tumble over 2.rows on 'long as 'w)
          — End diff –

          I think we do not need `.rowtime` for batch tables. On batch tables, any time attribute (long, timestamp) can be used for windows. Hence, defining a time column is not required and would IMO only confuse users.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114910572 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala — @@ -62,31 +62,30 @@ class GroupWindowTest extends TableTestBase { //=============================================================================================== @Test(expected = classOf [ValidationException] ) def testProcessingTimeTumblingGroupWindowOverTime(): Unit = { + def testInvalidProcessingTimeDefinition(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - table - .window(Tumble over 50.milli as 'w) // require a time attribute - .groupBy('w, 'string) - .select('string, 'int.count) + // proctime is not allowed + util.addTable[(Long, Int, String)]('long.proctime, 'int, 'string) } @Test(expected = classOf [ValidationException] ) def testProcessingTimeTumblingGroupWindowOverCount(): Unit = { + def testInvalidProcessingTimeDefinition2(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + // proctime is not allowed + util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime) + } table .window(Tumble over 2.rows as 'w) // require a time attribute .groupBy('w, 'string) .select('string, 'int.count) + @Test(expected = classOf [ValidationException] ) + def testInvalidEventTimeDefinition(): Unit = { + val util = batchTestUtil() + // definition must not extend schema + util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime) } @Test def testEventTimeTumblingGroupWindowOverCount(): Unit = { val util = batchTestUtil() val table = util.addTable [(Long, Int, String)] ('long, 'int, 'string) + val table = util.addTable [(Long, Int, String)] ('long.rowtime, 'int, 'string) val windowedTable = table .window(Tumble over 2.rows on 'long as 'w) — End diff – I think we do not need `.rowtime` for batch tables. On batch tables, any time attribute (long, timestamp) can be used for windows. Hence, defining a time column is not required and would IMO only confuse users.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114904443

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala —
          @@ -0,0 +1,47 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.sources
          +
          +import org.apache.flink.api.java.tuple.Tuple2
          +
          +/**
          + * Defines logical time attributes for a [[TableSource]]. Time attributes can be used for
          + * indicating, accessing, and working with Flink's event-time or processing-time. A
          + * [[TableSource]] that implements this interface can define names and positions of rowtime
          + * and proctime attributes in the rows it produces.
          + */
          +trait DefinedTimeAttributes {
          — End diff –

          I would split this trait into `DefinesRowtimeAttribute` and `DefinesProctimeAttribute`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114904443 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala — @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.api.java.tuple.Tuple2 + +/** + * Defines logical time attributes for a [ [TableSource] ]. Time attributes can be used for + * indicating, accessing, and working with Flink's event-time or processing-time. A + * [ [TableSource] ] that implements this interface can define names and positions of rowtime + * and proctime attributes in the rows it produces. + */ +trait DefinedTimeAttributes { — End diff – I would split this trait into `DefinesRowtimeAttribute` and `DefinesProctimeAttribute`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114913659

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala —
          @@ -37,7 +38,50 @@ class StreamTableSourceScan(
          extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource)
          with StreamScan {

          • override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
            + override def deriveRowType() = {
            + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
            +
            + def removeIndex[T](idx: Int, l: List[T]): List[T] =
            Unknown macro: { + if (l.size < idx) { + l + } else { + l.take(idx) ++ l.drop(idx + 1) + } + }

            +
            + var fieldNames = TableEnvironment.getFieldNames(tableSource).toList
            + var fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
            +
            + val rowtime = tableSource match {
            + case timeSource: DefinedTimeAttributes if timeSource.getRowtimeAttribute != null =>

              • End diff –

          There are no tests for `TableSource`s that implement `DefinedTimeAttributes`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114913659 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala — @@ -37,7 +38,50 @@ class StreamTableSourceScan( extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource) with StreamScan { override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + override def deriveRowType() = { + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf [FlinkTypeFactory] + + def removeIndex [T] (idx: Int, l: List [T] ): List [T] = Unknown macro: { + if (l.size < idx) { + l + } else { + l.take(idx) ++ l.drop(idx + 1) + } + } + + var fieldNames = TableEnvironment.getFieldNames(tableSource).toList + var fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList + + val rowtime = tableSource match { + case timeSource: DefinedTimeAttributes if timeSource.getRowtimeAttribute != null => End diff – There are no tests for `TableSource`s that implement `DefinedTimeAttributes`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114808765

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala —
          @@ -150,3 +133,30 @@ case class WindowReference(name: String) extends Attribute {
          }
          }
          }
          +
          +abstract class TimeAttribute(val expression: Expression)
          + extends UnaryExpression
          + with NamedExpression {
          +
          + override private[flink] def child: Expression = expression
          +
          + override private[flink] def name: String = expression match

          { + case UnresolvedFieldReference(name) => name + case _ => throw new ValidationException("Unresolved field reference expected.") + }

          +
          + override private[flink] def toAttribute: Attribute =
          + throw new UnsupportedOperationException("Time attribute can not be used solely.")
          +}
          +
          +case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) {
          — End diff –

          I think we should not allow `TimeAttribute` in batch environments.
          In stream environments, there are different cases to distinguish:

          • rowtime on new attribute (i.e., defining it as an additional attribute at the end): Adds a new logical attribute to the schema
          • rowtime on existing attribute: Replaces the existing (physical) attribute with a new logical attribute. The reason is that data might contain a timestamp attribute which should be replaced by the logical timestamp attribute. If we would only allow to define rowtime on non-existing attributes, we would have two timestamp attributes, one that can be used as a time indicator and one that can't. I agree, this is not a super nice solution, but the best we could think of. Please let us know if you have a better idea. Btw. I expect most queries to consume data from TableSources. Those won't have this problem.
          • proctime on new field (i.e., defining it as an additional attribute at the end): adds a new logical attribute
          • proctime on existing field: should be forbidden, IMO.
          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114808765 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala — @@ -150,3 +133,30 @@ case class WindowReference(name: String) extends Attribute { } } } + +abstract class TimeAttribute(val expression: Expression) + extends UnaryExpression + with NamedExpression { + + override private [flink] def child: Expression = expression + + override private [flink] def name: String = expression match { + case UnresolvedFieldReference(name) => name + case _ => throw new ValidationException("Unresolved field reference expected.") + } + + override private [flink] def toAttribute: Attribute = + throw new UnsupportedOperationException("Time attribute can not be used solely.") +} + +case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) { — End diff – I think we should not allow `TimeAttribute` in batch environments. In stream environments, there are different cases to distinguish: rowtime on new attribute (i.e., defining it as an additional attribute at the end): Adds a new logical attribute to the schema rowtime on existing attribute: Replaces the existing (physical) attribute with a new logical attribute. The reason is that data might contain a timestamp attribute which should be replaced by the logical timestamp attribute. If we would only allow to define rowtime on non-existing attributes, we would have two timestamp attributes, one that can be used as a time indicator and one that can't. I agree, this is not a super nice solution, but the best we could think of. Please let us know if you have a better idea. Btw. I expect most queries to consume data from TableSources. Those won't have this problem. proctime on new field (i.e., defining it as an additional attribute at the end): adds a new logical attribute proctime on existing field: should be forbidden, IMO.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114911704

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala —
          @@ -83,15 +84,16 @@ class WindowAggregateTest extends TableTestBase {
          }

          @Test
          + @Ignore // TODO enable once CALCITE-1761 is fixed
          — End diff –

          Fix was merged to master, right?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114911704 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala — @@ -83,15 +84,16 @@ class WindowAggregateTest extends TableTestBase { } @Test + @Ignore // TODO enable once CALCITE-1761 is fixed — End diff – Fix was merged to master, right?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114904005

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala —
          @@ -37,7 +38,50 @@ class StreamTableSourceScan(
          extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource)
          with StreamScan {

          • override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
            + override def deriveRowType() = {
            + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
            +
            + def removeIndex[T](idx: Int, l: List[T]): List[T] = {
              • End diff –

          Can be done with `List.patch(idx, Nil, 1)`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114904005 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala — @@ -37,7 +38,50 @@ class StreamTableSourceScan( extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource) with StreamScan { override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + override def deriveRowType() = { + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf [FlinkTypeFactory] + + def removeIndex [T] (idx: Int, l: List [T] ): List [T] = { End diff – Can be done with `List.patch(idx, Nil, 1)`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114913093

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala —
          @@ -62,10 +62,10 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase {
          val stream = env
          .fromCollection(data)
          .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())

          • val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
            + val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
              • End diff –

          Both is possible. You can replace an existing attribute (such as `'long`) or add a new logical attribute (such as `'rowtime`).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114913093 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala — @@ -62,10 +62,10 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { val stream = env .fromCollection(data) .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) + val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string) End diff – Both is possible. You can replace an existing attribute (such as `'long`) or add a new logical attribute (such as `'rowtime`).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114911457

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala —
          @@ -652,6 +654,20 @@ trait ImplicitExpressionOperations {

          • @return the first and only element of an array with a single element
            */
            def element() = ArrayElement(expr)
            +
            + // Schema definition
            +
            + /**
            + * Declares a field as the rowtime attribute for indicating, accessing, and working in
            + * Flink's event time.
            + */
            + def rowtime = RowtimeAttribute(expr)
              • End diff –

          The `ExpressionParser` needs to be adapted as well to support time indicator definitions with Java, right?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114911457 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala — @@ -652,6 +654,20 @@ trait ImplicitExpressionOperations { @return the first and only element of an array with a single element */ def element() = ArrayElement(expr) + + // Schema definition + + /** + * Declares a field as the rowtime attribute for indicating, accessing, and working in + * Flink's event time. + */ + def rowtime = RowtimeAttribute(expr) End diff – The `ExpressionParser` needs to be adapted as well to support time indicator definitions with Java, right?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114910197

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala —
          @@ -62,31 +62,30 @@ class GroupWindowTest extends TableTestBase {
          //===============================================================================================

          @Test(expected = classOf[ValidationException])

          • def testProcessingTimeTumblingGroupWindowOverTime(): Unit = {
            + def testInvalidProcessingTimeDefinition(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - table - .window(Tumble over 50.milli as 'w) // require a time attribute - .groupBy('w, 'string) - .select('string, 'int.count) + // proctime is not allowed + util.addTable[(Long, Int, String)]('long.proctime, 'int, 'string) }

          @Test(expected = classOf[ValidationException])

          • def testProcessingTimeTumblingGroupWindowOverCount(): Unit = {
            + def testInvalidProcessingTimeDefinition2(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + // proctime is not allowed + util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime) + }
          • table
          • .window(Tumble over 2.rows as 'w) // require a time attribute
          • .groupBy('w, 'string)
          • .select('string, 'int.count)
            + @Test(expected = classOf[ValidationException])
            + def testInvalidEventTimeDefinition(): Unit = { + val util = batchTestUtil() + // definition must not extend schema + util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime) }

          @Test
          def testEventTimeTumblingGroupWindowOverCount(): Unit = {
          val util = batchTestUtil()

          • val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
            + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
              • End diff –

          +1, we should only replace attributes with valid time types (timestamp, long) for stream tables.
          I would not allow `.rowtime` for batch tables.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114910197 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala — @@ -62,31 +62,30 @@ class GroupWindowTest extends TableTestBase { //=============================================================================================== @Test(expected = classOf [ValidationException] ) def testProcessingTimeTumblingGroupWindowOverTime(): Unit = { + def testInvalidProcessingTimeDefinition(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - table - .window(Tumble over 50.milli as 'w) // require a time attribute - .groupBy('w, 'string) - .select('string, 'int.count) + // proctime is not allowed + util.addTable[(Long, Int, String)]('long.proctime, 'int, 'string) } @Test(expected = classOf [ValidationException] ) def testProcessingTimeTumblingGroupWindowOverCount(): Unit = { + def testInvalidProcessingTimeDefinition2(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + // proctime is not allowed + util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime) + } table .window(Tumble over 2.rows as 'w) // require a time attribute .groupBy('w, 'string) .select('string, 'int.count) + @Test(expected = classOf [ValidationException] ) + def testInvalidEventTimeDefinition(): Unit = { + val util = batchTestUtil() + // definition must not extend schema + util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime) } @Test def testEventTimeTumblingGroupWindowOverCount(): Unit = { val util = batchTestUtil() val table = util.addTable [(Long, Int, String)] ('long, 'int, 'string) + val table = util.addTable [(Long, Int, String)] ('long.rowtime, 'int, 'string) End diff – +1, we should only replace attributes with valid time types (timestamp, long) for stream tables. I would not allow `.rowtime` for batch tables.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3808

          I didn't look at `RelTimeIndicatorConverter` yet.
          Will do so tomorrow.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3808 I didn't look at `RelTimeIndicatorConverter` yet. Will do so tomorrow.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114920160

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala —
          @@ -194,7 +194,30 @@ abstract class BatchTableEnvironment(
          protected def registerDataSetInternal[T](
          name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = {

          • val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType, fields)
            + val (fieldNames, fieldIndexes) = getFieldInfo[T](
            + dataSet.getType,
            + fields,
            + ignoreTimeAttributes = true)
            +
            + // validate and extract time attributes
            + val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields)
            +
            + // don't allow proctime on batch
            + proctime match {
              • End diff –

          In fact, In batch mode we only support `event-time`, And the the column that represents the `rowtime` is not an indicators but a real exist field. We only care about the data type of the field,So IMO. we can not using both `.proctime` and `.rowtime` in batch mode.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114920160 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala — @@ -194,7 +194,30 @@ abstract class BatchTableEnvironment( protected def registerDataSetInternal [T] ( name: String, dataSet: DataSet [T] , fields: Array [Expression] ): Unit = { val (fieldNames, fieldIndexes) = getFieldInfo [T] (dataSet.getType, fields) + val (fieldNames, fieldIndexes) = getFieldInfo [T] ( + dataSet.getType, + fields, + ignoreTimeAttributes = true) + + // validate and extract time attributes + val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields) + + // don't allow proctime on batch + proctime match { End diff – In fact, In batch mode we only support `event-time`, And the the column that represents the `rowtime` is not an indicators but a real exist field. We only care about the data type of the field,So IMO. we can not using both `.proctime` and `.rowtime` in batch mode.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114925594

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala —
          @@ -150,3 +133,30 @@ case class WindowReference(name: String) extends Attribute {
          }
          }
          }
          +
          +abstract class TimeAttribute(val expression: Expression)
          + extends UnaryExpression
          + with NamedExpression {
          +
          + override private[flink] def child: Expression = expression
          +
          + override private[flink] def name: String = expression match

          { + case UnresolvedFieldReference(name) => name + case _ => throw new ValidationException("Unresolved field reference expected.") + }

          +
          + override private[flink] def toAttribute: Attribute =
          + throw new UnsupportedOperationException("Time attribute can not be used solely.")
          +}
          +
          +case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) {
          — End diff –

          1. Not allow `TimeAttribute` make sense for me. +1
          2. Forbidden `proctime` on existing field that's I want to see.+1
          3. About ` rowtime `on existing attribute,I'am not sure. But I think in a Stream environment, If we want query the `rowtime` column, It should be a physical field. And if only a indicators of time mode. I think It should not be a physical field. Because there is no place to apply this value. I had left a question to @twalthr in first review summary. So I want to know @twalthr 's opinion?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114925594 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala — @@ -150,3 +133,30 @@ case class WindowReference(name: String) extends Attribute { } } } + +abstract class TimeAttribute(val expression: Expression) + extends UnaryExpression + with NamedExpression { + + override private [flink] def child: Expression = expression + + override private [flink] def name: String = expression match { + case UnresolvedFieldReference(name) => name + case _ => throw new ValidationException("Unresolved field reference expected.") + } + + override private [flink] def toAttribute: Attribute = + throw new UnsupportedOperationException("Time attribute can not be used solely.") +} + +case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) { — End diff – 1. Not allow `TimeAttribute` make sense for me. +1 2. Forbidden `proctime` on existing field that's I want to see.+1 3. About ` rowtime `on existing attribute,I'am not sure. But I think in a Stream environment, If we want query the `rowtime` column, It should be a physical field. And if only a indicators of time mode. I think It should not be a physical field. Because there is no place to apply this value. I had left a question to @twalthr in first review summary. So I want to know @twalthr 's opinion?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114923783

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala —
          @@ -577,70 +577,94 @@ abstract class TableEnvironment(val config: TableConfig) {

          /**

          • Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
          • * [[Expression]].
            + * [[Expression]]. It does not handle time attributes but considers them in indices, if
            + * ignore flag is not false.
            *
          • @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
          • @param exprs The expressions that define the field names.
            + * @param ignoreTimeAttributes ignore time attributes and handle them as regular expressions.
          • @tparam A The type of the TypeInformation.
          • @return A tuple of two arrays holding the field names and corresponding field positions.
            */
            protected[flink] def getFieldInfo[A](
          • inputType: TypeInformation[A],
          • exprs: Array[Expression]): (Array[String], Array[Int]) = {
            + inputType: TypeInformation[A],
            + exprs: Array[Expression],
            + ignoreTimeAttributes: Boolean)
            + : (Array[String], Array[Int]) = {

          TableEnvironment.validateType(inputType)

          + val filteredExprs = if (ignoreTimeAttributes) {
          + exprs.map

          { + case ta: TimeAttribute => ta.expression + case e@_ => e + }

          + } else

          { + exprs + }

          +
          val indexedNames: Array[(Int, String)] = inputType match {
          case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
          throw new TableException(
          "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
          "Please specify the type of the input with a RowTypeInfo.")
          case a: AtomicType[A] =>

          • if (exprs.length != 1) { - throw new TableException("Table of atomic type can only have a single field.") - }
          • exprs.map {
          • case UnresolvedFieldReference(name) => (0, name)
            + filteredExprs.zipWithIndex flatMap {
            + case (UnresolvedFieldReference(name), idx) =>
            + if (idx > 0) { + throw new TableException("Table of atomic type can only have a single field.") + }

            + Some((0, name))
            + case (_: TimeAttribute, _) if ignoreTimeAttributes =>

              • End diff –

          Can we remove this if `if ignoreTimeAttributes` check? Because In DataStream mode. we also need parse the `TimeAttribute`. e.g:
          ` streamTestUtil().addTable[String]('string,'t.rowtime)`
          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114923783 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala — @@ -577,70 +577,94 @@ abstract class TableEnvironment(val config: TableConfig) { /** Returns field names and field positions for a given [ [TypeInformation] ] and [ [Array] ] of * [ [Expression] ]. + * [ [Expression] ]. It does not handle time attributes but considers them in indices, if + * ignore flag is not false. * @param inputType The [ [TypeInformation] ] against which the [ [Expression] ]s are evaluated. @param exprs The expressions that define the field names. + * @param ignoreTimeAttributes ignore time attributes and handle them as regular expressions. @tparam A The type of the TypeInformation. @return A tuple of two arrays holding the field names and corresponding field positions. */ protected [flink] def getFieldInfo [A] ( inputType: TypeInformation [A] , exprs: Array [Expression] ): (Array [String] , Array [Int] ) = { + inputType: TypeInformation [A] , + exprs: Array [Expression] , + ignoreTimeAttributes: Boolean) + : (Array [String] , Array [Int] ) = { TableEnvironment.validateType(inputType) + val filteredExprs = if (ignoreTimeAttributes) { + exprs.map { + case ta: TimeAttribute => ta.expression + case e@_ => e + } + } else { + exprs + } + val indexedNames: Array [(Int, String)] = inputType match { case g: GenericTypeInfo [A] if g.getTypeClass == classOf [Row] => throw new TableException( "An input of GenericTypeInfo<Row> cannot be converted to Table. " + "Please specify the type of the input with a RowTypeInfo.") case a: AtomicType [A] => if (exprs.length != 1) { - throw new TableException("Table of atomic type can only have a single field.") - } exprs.map { case UnresolvedFieldReference(name) => (0, name) + filteredExprs.zipWithIndex flatMap { + case (UnresolvedFieldReference(name), idx) => + if (idx > 0) { + throw new TableException("Table of atomic type can only have a single field.") + } + Some((0, name)) + case (_: TimeAttribute, _) if ignoreTimeAttributes => End diff – Can we remove this if `if ignoreTimeAttributes` check? Because In DataStream mode. we also need parse the `TimeAttribute`. e.g: ` streamTestUtil().addTable [String] ('string,'t.rowtime)` What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3808#discussion_r114977155

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala —
          @@ -145,6 +105,7 @@ abstract class StreamTableEnvironment(
          "StreamTableEnvironment")
          }
          }
          +
          /**
          — End diff –

          Time materialization needs to be applied to the output as well. This should solve the issue.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3808#discussion_r114977155 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala — @@ -145,6 +105,7 @@ abstract class StreamTableEnvironment( "StreamTableEnvironment") } } + /** — End diff – Time materialization needs to be applied to the output as well. This should solve the issue.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3808

          Hi @twalthr, I addressed my comments, rebased to current master, and will merge this PR given that all tests pass.

          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3808 Hi @twalthr, I addressed my comments, rebased to current master, and will merge this PR given that all tests pass. Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3808

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3808
          Hide
          fhueske Fabian Hueske added a comment -

          Implemented with 24bf61ceb332f2db2dc4bab624b73beffae1160a

          Show
          fhueske Fabian Hueske added a comment - Implemented with 24bf61ceb332f2db2dc4bab624b73beffae1160a

            People

            • Assignee:
              twalthr Timo Walther
              Reporter:
              twalthr Timo Walther
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development