Details

    • Type: New Feature
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      FLINK-5884 added support for time indicators. However, there are still some features missing i.e. materialization of metadata timestamp.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user twalthr opened a pull request:

          https://github.com/apache/flink/pull/3862

          FLINK-6483 [table] Support time materialization

          This PR adds support for time materialization. It also fixes several bugs related to time handling in the Table API & SQL.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/twalthr/flink FLINK-6483

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3862.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3862


          commit 12c6e2214df4da4f7aeaa511e278f6ce9d80595a
          Author: twalthr <twalthr@apache.org>
          Date: 2017-05-10T08:11:34Z

          FLINK-6483 [table] Support time materialization


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/3862 FLINK-6483 [table] Support time materialization This PR adds support for time materialization. It also fixes several bugs related to time handling in the Table API & SQL. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-6483 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3862.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3862 commit 12c6e2214df4da4f7aeaa511e278f6ce9d80595a Author: twalthr <twalthr@apache.org> Date: 2017-05-10T08:11:34Z FLINK-6483 [table] Support time materialization
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3862#discussion_r115705183

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -1840,6 +1859,35 @@ class CodeGenerator(
          }
          }

          + private[flink] def generateRecordTimestamp(isEventTime: Boolean): GeneratedExpression = {
          + val resultTerm = newName("result")
          + val nullTerm = newName("isNull")
          + val resultTypeTerm = primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
          + val defaultValue = primitiveDefaultValue(SqlTimeTypeInfo.TIMESTAMP)
          +
          + if (isEventTime) {
          + val resultCode =
          + s"""
          + |boolean $nullTerm = $contextTerm.timestamp() == null;
          — End diff –

          Maybe it makes even sense to to this check once in the beginning of a query to avoid the repeated checks whenever we materialize the time. I'd consider this as a future improvement.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3862#discussion_r115705183 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1840,6 +1859,35 @@ class CodeGenerator( } } + private [flink] def generateRecordTimestamp(isEventTime: Boolean): GeneratedExpression = { + val resultTerm = newName("result") + val nullTerm = newName("isNull") + val resultTypeTerm = primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP) + val defaultValue = primitiveDefaultValue(SqlTimeTypeInfo.TIMESTAMP) + + if (isEventTime) { + val resultCode = + s""" + |boolean $nullTerm = $contextTerm.timestamp() == null; — End diff – Maybe it makes even sense to to this check once in the beginning of a query to avoid the repeated checks whenever we materialize the time. I'd consider this as a future improvement.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3862#discussion_r115704440

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
          @@ -1840,6 +1859,35 @@ class CodeGenerator(
          }
          }

          + private[flink] def generateRecordTimestamp(isEventTime: Boolean): GeneratedExpression = {
          + val resultTerm = newName("result")
          + val nullTerm = newName("isNull")
          + val resultTypeTerm = primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
          + val defaultValue = primitiveDefaultValue(SqlTimeTypeInfo.TIMESTAMP)
          +
          + if (isEventTime) {
          + val resultCode =
          + s"""
          + |boolean $nullTerm = $contextTerm.timestamp() == null;
          — End diff –

          I think we should throw an exception if the timestamp is `null`.
          The query only access the timestamp if it explicitly asks for event-time. If the timestamp is not set, the query should fail, IMO.

          As an additional check, we should add a check to the `StreamTableEnvironment` (and the `TableSourceTable`) that verifies that event-time is enabled in the `StreamExecutionEnvironment`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3862#discussion_r115704440 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -1840,6 +1859,35 @@ class CodeGenerator( } } + private [flink] def generateRecordTimestamp(isEventTime: Boolean): GeneratedExpression = { + val resultTerm = newName("result") + val nullTerm = newName("isNull") + val resultTypeTerm = primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP) + val defaultValue = primitiveDefaultValue(SqlTimeTypeInfo.TIMESTAMP) + + if (isEventTime) { + val resultCode = + s""" + |boolean $nullTerm = $contextTerm.timestamp() == null; — End diff – I think we should throw an exception if the timestamp is `null`. The query only access the timestamp if it explicitly asks for event-time. If the timestamp is not set, the query should fail, IMO. As an additional check, we should add a check to the `StreamTableEnvironment` (and the `TableSourceTable`) that verifies that event-time is enabled in the `StreamExecutionEnvironment`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3862#discussion_r115719405

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala —
          @@ -0,0 +1,284 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.calcite
          +
          +import java.sql.Timestamp
          +
          +import org.apache.flink.api.scala._
          +import org.apache.flink.table.api.ValidationException
          +import org.apache.flink.table.api.scala._
          +import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc
          +import org.apache.flink.table.expressions.

          {TimeIntervalUnit, WindowReference}

          +import org.apache.flink.table.functions.TableFunction
          +import org.apache.flink.table.plan.logical.TumblingGroupWindow
          +import org.apache.flink.table.utils.TableTestBase
          +import org.apache.flink.table.utils.TableTestUtil._
          +import org.junit.Test
          +
          +/**
          + * Tests for [[RelTimeIndicatorConverter]].
          + */
          +class RelTimeIndicatorConverterTest extends TableTestBase {
          +
          + @Test
          + def testSimpleMaterialization(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t + .select('rowtime.floor(TimeIntervalUnit.DAY) as 'rowtime, 'long) + .filter('long > 0) + .select('rowtime) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "FLOOR(TIME_MATERIALIZATION(rowtime)", "FLAG(DAY)) AS rowtime"), + term("where", ">(long, 0)") + ) + + util.verifyTable(result, expected) + }

          +
          + @Test
          + def testSelectAll(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t.select('*) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime", "long", "int", + "TIME_MATERIALIZATION(proctime) AS proctime") + ) + + util.verifyTable(result, expected) + }

          +
          + @Test
          + def testFilteringOnRowtime(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .filter('rowtime > "1990-12-02 12:11:11".toTimestamp) + .select('rowtime) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime"), + term("where", ">(TIME_MATERIALIZATION(rowtime), 1990-12-02 12:11:11)") + ) + + util.verifyTable(result, expected) + }

          +
          + @Test(expected = classOf[ValidationException])
          + def testGroupingOnRowtime(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t + .groupBy('rowtime) + .select('long.count) + + util.verifyTable(result, "FAIL") + }

          +
          + @Test(expected = classOf[ValidationException])
          + def testGroupingOnProctimeSql(): Unit =

          { + val util = streamTestUtil() + util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime) + + val result = util.tEnv.sql("SELECT COUNT(long) FROM MyTable GROUP BY proctime") + + util.verifyTable(result, "FAIL") + }

          +
          + @Test(expected = classOf[ValidationException])
          + def testAggregationOnRowtime(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .groupBy('long) + .select('rowtime.count) + + util.verifyTable(result, "FAIL") + }

          +
          + @Test(expected = classOf[ValidationException])
          + def testAggregationOnProctimeSql(): Unit =

          { + val util = streamTestUtil() + util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime) + + val result = util.tEnv.sql("SELECT COUNT(proctime) FROM MyTable GROUP BY long") + + util.verifyTable(result, "FAIL") + }

          +
          + @Test
          + def testTableFunction(): Unit = {
          + val util = streamTestUtil()
          + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime)
          + val func = new TableFunc
          +
          + val result = t.join(func('rowtime, 'proctime) as 's).select('rowtime, 'proctime, 's)
          +
          + val expected = unaryNode(
          + "DataStreamCalc",
          + unaryNode(
          + "DataStreamCorrelate",
          + streamTableNode(0),
          + term("invocation",
          + s"$

          {func.functionIdentifier}

          (TIME_MATERIALIZATION($$0), TIME_MATERIALIZATION($$3))"),
          + term("function", func),
          + term("rowType", "RecordType(TIMESTAMP(3) rowtime, BIGINT long, INTEGER int, " +
          + "TIMESTAMP(3) proctime, VARCHAR(2147483647) s)"),
          + term("joinType", "INNER")
          + ),
          + term("select",
          + "TIME_MATERIALIZATION(rowtime) AS rowtime",
          + "TIME_MATERIALIZATION(proctime) AS proctime",
          + "s")
          + )
          +
          + util.verifyTable(result, expected)
          + }
          +
          + @Test(expected = classOf[ValidationException])
          + def testWindowGroupOnRowtime(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .window(Tumble over 100.millis on 'rowtime as 'w) + .groupBy('w, 'rowtime) + .select('w.start, 'rowtime, 'int.sum) + + util.verifyTable(result, "FAIL") + }

          +
          + @Test(expected = classOf[ValidationException])
          + def testWindowAggregationOnRowtime(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .window(Tumble over 100.millis on 'rowtime as 'w) + .groupBy('w, 'long) + .select('w.start, 'long, 'rowtime.count) + + util.verifyTable(result, "FAIL") + }

          +
          + @Test
          + def testWindowStartEnd(): Unit = {
          — End diff –

          tests only window end.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3862#discussion_r115719405 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala — @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.calcite + +import java.sql.Timestamp + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc +import org.apache.flink.table.expressions. {TimeIntervalUnit, WindowReference} +import org.apache.flink.table.functions.TableFunction +import org.apache.flink.table.plan.logical.TumblingGroupWindow +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Test + +/** + * Tests for [ [RelTimeIndicatorConverter] ]. + */ +class RelTimeIndicatorConverterTest extends TableTestBase { + + @Test + def testSimpleMaterialization(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t + .select('rowtime.floor(TimeIntervalUnit.DAY) as 'rowtime, 'long) + .filter('long > 0) + .select('rowtime) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "FLOOR(TIME_MATERIALIZATION(rowtime)", "FLAG(DAY)) AS rowtime"), + term("where", ">(long, 0)") + ) + + util.verifyTable(result, expected) + } + + @Test + def testSelectAll(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t.select('*) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime", "long", "int", + "TIME_MATERIALIZATION(proctime) AS proctime") + ) + + util.verifyTable(result, expected) + } + + @Test + def testFilteringOnRowtime(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .filter('rowtime > "1990-12-02 12:11:11".toTimestamp) + .select('rowtime) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime"), + term("where", ">(TIME_MATERIALIZATION(rowtime), 1990-12-02 12:11:11)") + ) + + util.verifyTable(result, expected) + } + + @Test(expected = classOf [ValidationException] ) + def testGroupingOnRowtime(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t + .groupBy('rowtime) + .select('long.count) + + util.verifyTable(result, "FAIL") + } + + @Test(expected = classOf [ValidationException] ) + def testGroupingOnProctimeSql(): Unit = { + val util = streamTestUtil() + util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime) + + val result = util.tEnv.sql("SELECT COUNT(long) FROM MyTable GROUP BY proctime") + + util.verifyTable(result, "FAIL") + } + + @Test(expected = classOf [ValidationException] ) + def testAggregationOnRowtime(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .groupBy('long) + .select('rowtime.count) + + util.verifyTable(result, "FAIL") + } + + @Test(expected = classOf [ValidationException] ) + def testAggregationOnProctimeSql(): Unit = { + val util = streamTestUtil() + util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime) + + val result = util.tEnv.sql("SELECT COUNT(proctime) FROM MyTable GROUP BY long") + + util.verifyTable(result, "FAIL") + } + + @Test + def testTableFunction(): Unit = { + val util = streamTestUtil() + val t = util.addTable [(Long, Long, Int)] ('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + val func = new TableFunc + + val result = t.join(func('rowtime, 'proctime) as 's).select('rowtime, 'proctime, 's) + + val expected = unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamCorrelate", + streamTableNode(0), + term("invocation", + s"$ {func.functionIdentifier} (TIME_MATERIALIZATION($$0), TIME_MATERIALIZATION($$3))"), + term("function", func), + term("rowType", "RecordType(TIMESTAMP(3) rowtime, BIGINT long, INTEGER int, " + + "TIMESTAMP(3) proctime, VARCHAR(2147483647) s)"), + term("joinType", "INNER") + ), + term("select", + "TIME_MATERIALIZATION(rowtime) AS rowtime", + "TIME_MATERIALIZATION(proctime) AS proctime", + "s") + ) + + util.verifyTable(result, expected) + } + + @Test(expected = classOf [ValidationException] ) + def testWindowGroupOnRowtime(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .window(Tumble over 100.millis on 'rowtime as 'w) + .groupBy('w, 'rowtime) + .select('w.start, 'rowtime, 'int.sum) + + util.verifyTable(result, "FAIL") + } + + @Test(expected = classOf [ValidationException] ) + def testWindowAggregationOnRowtime(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .window(Tumble over 100.millis on 'rowtime as 'w) + .groupBy('w, 'long) + .select('w.start, 'long, 'rowtime.count) + + util.verifyTable(result, "FAIL") + } + + @Test + def testWindowStartEnd(): Unit = { — End diff – tests only window end.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3862#discussion_r115721182

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala —
          @@ -0,0 +1,210 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.runtime.datastream
          +
          +import java.math.BigDecimal
          +import java.sql.Timestamp
          +
          +import org.apache.flink.api.scala._
          +import org.apache.flink.streaming.api.TimeCharacteristic
          +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
          +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
          +import org.apache.flink.streaming.api.watermark.Watermark
          +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
          +import org.apache.flink.table.api.

          {TableEnvironment, Types, ValidationException}

          +import org.apache.flink.table.api.scala._
          +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
          +import org.apache.flink.table.api.scala.stream.utils.StreamITCase
          +import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc
          +import org.apache.flink.table.expressions.

          {TimeIntervalUnit, WindowReference}

          +import org.apache.flink.table.functions.TableFunction
          +import org.apache.flink.table.plan.logical.TumblingGroupWindow
          +import org.apache.flink.table.runtime.datastream.TimeAttributesITCase.TimestampWithEqualWatermark
          +import org.apache.flink.table.utils.TableTestBase
          +import org.apache.flink.table.utils.TableTestUtil._
          +import org.apache.flink.types.Row
          +import org.junit.Assert._
          +import org.junit.Test
          +
          +import scala.collection.mutable
          +
          +/**
          + * Tests for access and materialization of time attributes.
          + */
          +class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
          +
          + val data = List(
          + (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),
          + (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"),
          + (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"),
          + (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"),
          + (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
          + (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"),
          + (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"))
          +
          + @Test
          + def testCalcMaterialization(): Unit =

          { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + + val t = table.select('rowtime.cast(Types.STRING)) + + val results = t.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.001", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.003", + "1970-01-01 00:00:00.004", + "1970-01-01 00:00:00.007", + "1970-01-01 00:00:00.008", + "1970-01-01 00:00:00.016") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          + @Test
          + def testCalcMaterialization2(): Unit =

          { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + + val t = table + .filter('rowtime.cast(Types.LONG) > 4) + .select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 'rowtime.ceil(TimeIntervalUnit.DAY)) + + val results = t.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.007,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0", + "1970-01-01 00:00:00.008,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0", + "1970-01-01 00:00:00.016,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          + @Test
          + def testTableFunction(): Unit =

          { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable( + tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'proctime.proctime) + val func = new TableFunc + + val t = table.join(func('rowtime, 'proctime) as 's).select('rowtime, 's) + + val results = t.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.001,1true", + "1970-01-01 00:00:00.002,2true", + "1970-01-01 00:00:00.003,3true", + "1970-01-01 00:00:00.004,4true", + "1970-01-01 00:00:00.007,7true", + "1970-01-01 00:00:00.008,8true", + "1970-01-01 00:00:00.016,16true") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          + @Test
          + def testUnion(): Unit =

          { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable( + tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + + val t = table.unionAll(table).select('rowtime) + + val results = t.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.001", + "1970-01-01 00:00:00.001", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.003", + "1970-01-01 00:00:00.003", + "1970-01-01 00:00:00.004", + "1970-01-01 00:00:00.004", + "1970-01-01 00:00:00.007", + "1970-01-01 00:00:00.007", + "1970-01-01 00:00:00.008", + "1970-01-01 00:00:00.008", + "1970-01-01 00:00:00.016", + "1970-01-01 00:00:00.016") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          +}
          +
          +object TimeAttributesITCase {
          + class TimestampWithEqualWatermark
          + extends AssignerWithPunctuatedWatermarks[(Long, Int, Double, Float, BigDecimal, String)] {
          +
          + override def checkAndGetNextWatermark(
          + lastElement: (Long, Int, Double, Float, BigDecimal, String),
          + extractedTimestamp: Long)
          + : Watermark =

          { + new Watermark(extractedTimestamp) + }

          +
          + override def extractTimestamp(
          + element: (Long, Int, Double, Float, BigDecimal, String),
          + previousElementTimestamp: Long): Long =

          { + element._1 + }

          + }
          +
          + class TableFunc extends TableFunction[String] {
          + def eval(time1: Long, time2: Timestamp): Unit = {
          + time1.toString + time2.toString
          — End diff –

          should be `collect(time1.toString + time2.toString)`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3862#discussion_r115721182 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala — @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.datastream + +import java.math.BigDecimal +import java.sql.Timestamp + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.table.api. {TableEnvironment, Types, ValidationException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase +import org.apache.flink.table.api.scala.stream.utils.StreamITCase +import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc +import org.apache.flink.table.expressions. {TimeIntervalUnit, WindowReference} +import org.apache.flink.table.functions.TableFunction +import org.apache.flink.table.plan.logical.TumblingGroupWindow +import org.apache.flink.table.runtime.datastream.TimeAttributesITCase.TimestampWithEqualWatermark +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil._ +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +/** + * Tests for access and materialization of time attributes. + */ +class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { + + val data = List( + (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"), + (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"), + (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"), + (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"), + (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"), + (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"), + (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world")) + + @Test + def testCalcMaterialization(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + + val t = table.select('rowtime.cast(Types.STRING)) + + val results = t.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.001", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.003", + "1970-01-01 00:00:00.004", + "1970-01-01 00:00:00.007", + "1970-01-01 00:00:00.008", + "1970-01-01 00:00:00.016") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testCalcMaterialization2(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + + val t = table + .filter('rowtime.cast(Types.LONG) > 4) + .select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 'rowtime.ceil(TimeIntervalUnit.DAY)) + + val results = t.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.007,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0", + "1970-01-01 00:00:00.008,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0", + "1970-01-01 00:00:00.016,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testTableFunction(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable( + tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'proctime.proctime) + val func = new TableFunc + + val t = table.join(func('rowtime, 'proctime) as 's).select('rowtime, 's) + + val results = t.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.001,1true", + "1970-01-01 00:00:00.002,2true", + "1970-01-01 00:00:00.003,3true", + "1970-01-01 00:00:00.004,4true", + "1970-01-01 00:00:00.007,7true", + "1970-01-01 00:00:00.008,8true", + "1970-01-01 00:00:00.016,16true") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testUnion(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable( + tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + + val t = table.unionAll(table).select('rowtime) + + val results = t.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.001", + "1970-01-01 00:00:00.001", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.003", + "1970-01-01 00:00:00.003", + "1970-01-01 00:00:00.004", + "1970-01-01 00:00:00.004", + "1970-01-01 00:00:00.007", + "1970-01-01 00:00:00.007", + "1970-01-01 00:00:00.008", + "1970-01-01 00:00:00.008", + "1970-01-01 00:00:00.016", + "1970-01-01 00:00:00.016") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + +} + +object TimeAttributesITCase { + class TimestampWithEqualWatermark + extends AssignerWithPunctuatedWatermarks [(Long, Int, Double, Float, BigDecimal, String)] { + + override def checkAndGetNextWatermark( + lastElement: (Long, Int, Double, Float, BigDecimal, String), + extractedTimestamp: Long) + : Watermark = { + new Watermark(extractedTimestamp) + } + + override def extractTimestamp( + element: (Long, Int, Double, Float, BigDecimal, String), + previousElementTimestamp: Long): Long = { + element._1 + } + } + + class TableFunc extends TableFunction [String] { + def eval(time1: Long, time2: Timestamp): Unit = { + time1.toString + time2.toString — End diff – should be `collect(time1.toString + time2.toString)`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3862#discussion_r115721260

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala —
          @@ -0,0 +1,210 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.runtime.datastream
          +
          +import java.math.BigDecimal
          +import java.sql.Timestamp
          +
          +import org.apache.flink.api.scala._
          — End diff –

          Several unused imports

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3862#discussion_r115721260 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala — @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.datastream + +import java.math.BigDecimal +import java.sql.Timestamp + +import org.apache.flink.api.scala._ — End diff – Several unused imports
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3862#discussion_r115719417

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala —
          @@ -0,0 +1,284 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.calcite
          +
          +import java.sql.Timestamp
          +
          +import org.apache.flink.api.scala._
          +import org.apache.flink.table.api.ValidationException
          +import org.apache.flink.table.api.scala._
          +import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc
          +import org.apache.flink.table.expressions.

          {TimeIntervalUnit, WindowReference}

          +import org.apache.flink.table.functions.TableFunction
          +import org.apache.flink.table.plan.logical.TumblingGroupWindow
          +import org.apache.flink.table.utils.TableTestBase
          +import org.apache.flink.table.utils.TableTestUtil._
          +import org.junit.Test
          +
          +/**
          + * Tests for [[RelTimeIndicatorConverter]].
          + */
          +class RelTimeIndicatorConverterTest extends TableTestBase {
          +
          + @Test
          + def testSimpleMaterialization(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t + .select('rowtime.floor(TimeIntervalUnit.DAY) as 'rowtime, 'long) + .filter('long > 0) + .select('rowtime) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "FLOOR(TIME_MATERIALIZATION(rowtime)", "FLAG(DAY)) AS rowtime"), + term("where", ">(long, 0)") + ) + + util.verifyTable(result, expected) + }

          +
          + @Test
          + def testSelectAll(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t.select('*) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime", "long", "int", + "TIME_MATERIALIZATION(proctime) AS proctime") + ) + + util.verifyTable(result, expected) + }

          +
          + @Test
          + def testFilteringOnRowtime(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .filter('rowtime > "1990-12-02 12:11:11".toTimestamp) + .select('rowtime) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime"), + term("where", ">(TIME_MATERIALIZATION(rowtime), 1990-12-02 12:11:11)") + ) + + util.verifyTable(result, expected) + }

          +
          + @Test(expected = classOf[ValidationException])
          + def testGroupingOnRowtime(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t + .groupBy('rowtime) + .select('long.count) + + util.verifyTable(result, "FAIL") + }

          +
          + @Test(expected = classOf[ValidationException])
          + def testGroupingOnProctimeSql(): Unit =

          { + val util = streamTestUtil() + util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime) + + val result = util.tEnv.sql("SELECT COUNT(long) FROM MyTable GROUP BY proctime") + + util.verifyTable(result, "FAIL") + }

          +
          + @Test(expected = classOf[ValidationException])
          + def testAggregationOnRowtime(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .groupBy('long) + .select('rowtime.count) + + util.verifyTable(result, "FAIL") + }

          +
          + @Test(expected = classOf[ValidationException])
          + def testAggregationOnProctimeSql(): Unit =

          { + val util = streamTestUtil() + util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime) + + val result = util.tEnv.sql("SELECT COUNT(proctime) FROM MyTable GROUP BY long") + + util.verifyTable(result, "FAIL") + }

          +
          + @Test
          + def testTableFunction(): Unit = {
          + val util = streamTestUtil()
          + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime)
          + val func = new TableFunc
          +
          + val result = t.join(func('rowtime, 'proctime) as 's).select('rowtime, 'proctime, 's)
          +
          + val expected = unaryNode(
          + "DataStreamCalc",
          + unaryNode(
          + "DataStreamCorrelate",
          + streamTableNode(0),
          + term("invocation",
          + s"$

          {func.functionIdentifier}

          (TIME_MATERIALIZATION($$0), TIME_MATERIALIZATION($$3))"),
          + term("function", func),
          + term("rowType", "RecordType(TIMESTAMP(3) rowtime, BIGINT long, INTEGER int, " +
          + "TIMESTAMP(3) proctime, VARCHAR(2147483647) s)"),
          + term("joinType", "INNER")
          + ),
          + term("select",
          + "TIME_MATERIALIZATION(rowtime) AS rowtime",
          + "TIME_MATERIALIZATION(proctime) AS proctime",
          + "s")
          + )
          +
          + util.verifyTable(result, expected)
          + }
          +
          + @Test(expected = classOf[ValidationException])
          + def testWindowGroupOnRowtime(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .window(Tumble over 100.millis on 'rowtime as 'w) + .groupBy('w, 'rowtime) + .select('w.start, 'rowtime, 'int.sum) + + util.verifyTable(result, "FAIL") + }

          +
          + @Test(expected = classOf[ValidationException])
          + def testWindowAggregationOnRowtime(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .window(Tumble over 100.millis on 'rowtime as 'w) + .groupBy('w, 'long) + .select('w.start, 'long, 'rowtime.count) + + util.verifyTable(result, "FAIL") + }

          +
          + @Test
          + def testWindowStartEnd(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .window(Tumble over 100.millis on 'rowtime as 'w) + .groupBy('w, 'long) + .select('w.end as 'rowtime, 'long, 'int.sum) + + val expected = unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupWindowAggregate", + streamTableNode(0), + term("groupBy", "long"), + term( + "window", + TumblingGroupWindow( + WindowReference("w"), + 'rowtime, + 100.millis)), + term("select", "long", "SUM(int) AS TMP_1", "end(WindowReference(w)) AS TMP_0") + ), + term("select", "TMP_0 AS rowtime", "long", "TMP_1") + ) + + util.verifyTable(result, expected) + }

          +
          + @Test
          + def testWindowStartEndSql(): Unit = {
          — End diff –

          tests only window end.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3862#discussion_r115719417 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala — @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.calcite + +import java.sql.Timestamp + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc +import org.apache.flink.table.expressions. {TimeIntervalUnit, WindowReference} +import org.apache.flink.table.functions.TableFunction +import org.apache.flink.table.plan.logical.TumblingGroupWindow +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Test + +/** + * Tests for [ [RelTimeIndicatorConverter] ]. + */ +class RelTimeIndicatorConverterTest extends TableTestBase { + + @Test + def testSimpleMaterialization(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t + .select('rowtime.floor(TimeIntervalUnit.DAY) as 'rowtime, 'long) + .filter('long > 0) + .select('rowtime) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "FLOOR(TIME_MATERIALIZATION(rowtime)", "FLAG(DAY)) AS rowtime"), + term("where", ">(long, 0)") + ) + + util.verifyTable(result, expected) + } + + @Test + def testSelectAll(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t.select('*) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime", "long", "int", + "TIME_MATERIALIZATION(proctime) AS proctime") + ) + + util.verifyTable(result, expected) + } + + @Test + def testFilteringOnRowtime(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .filter('rowtime > "1990-12-02 12:11:11".toTimestamp) + .select('rowtime) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime"), + term("where", ">(TIME_MATERIALIZATION(rowtime), 1990-12-02 12:11:11)") + ) + + util.verifyTable(result, expected) + } + + @Test(expected = classOf [ValidationException] ) + def testGroupingOnRowtime(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t + .groupBy('rowtime) + .select('long.count) + + util.verifyTable(result, "FAIL") + } + + @Test(expected = classOf [ValidationException] ) + def testGroupingOnProctimeSql(): Unit = { + val util = streamTestUtil() + util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime) + + val result = util.tEnv.sql("SELECT COUNT(long) FROM MyTable GROUP BY proctime") + + util.verifyTable(result, "FAIL") + } + + @Test(expected = classOf [ValidationException] ) + def testAggregationOnRowtime(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .groupBy('long) + .select('rowtime.count) + + util.verifyTable(result, "FAIL") + } + + @Test(expected = classOf [ValidationException] ) + def testAggregationOnProctimeSql(): Unit = { + val util = streamTestUtil() + util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime) + + val result = util.tEnv.sql("SELECT COUNT(proctime) FROM MyTable GROUP BY long") + + util.verifyTable(result, "FAIL") + } + + @Test + def testTableFunction(): Unit = { + val util = streamTestUtil() + val t = util.addTable [(Long, Long, Int)] ('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + val func = new TableFunc + + val result = t.join(func('rowtime, 'proctime) as 's).select('rowtime, 'proctime, 's) + + val expected = unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamCorrelate", + streamTableNode(0), + term("invocation", + s"$ {func.functionIdentifier} (TIME_MATERIALIZATION($$0), TIME_MATERIALIZATION($$3))"), + term("function", func), + term("rowType", "RecordType(TIMESTAMP(3) rowtime, BIGINT long, INTEGER int, " + + "TIMESTAMP(3) proctime, VARCHAR(2147483647) s)"), + term("joinType", "INNER") + ), + term("select", + "TIME_MATERIALIZATION(rowtime) AS rowtime", + "TIME_MATERIALIZATION(proctime) AS proctime", + "s") + ) + + util.verifyTable(result, expected) + } + + @Test(expected = classOf [ValidationException] ) + def testWindowGroupOnRowtime(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .window(Tumble over 100.millis on 'rowtime as 'w) + .groupBy('w, 'rowtime) + .select('w.start, 'rowtime, 'int.sum) + + util.verifyTable(result, "FAIL") + } + + @Test(expected = classOf [ValidationException] ) + def testWindowAggregationOnRowtime(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .window(Tumble over 100.millis on 'rowtime as 'w) + .groupBy('w, 'long) + .select('w.start, 'long, 'rowtime.count) + + util.verifyTable(result, "FAIL") + } + + @Test + def testWindowStartEnd(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .window(Tumble over 100.millis on 'rowtime as 'w) + .groupBy('w, 'long) + .select('w.end as 'rowtime, 'long, 'int.sum) + + val expected = unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupWindowAggregate", + streamTableNode(0), + term("groupBy", "long"), + term( + "window", + TumblingGroupWindow( + WindowReference("w"), + 'rowtime, + 100.millis)), + term("select", "long", "SUM(int) AS TMP_1", "end(WindowReference(w)) AS TMP_0") + ), + term("select", "TMP_0 AS rowtime", "long", "TMP_1") + ) + + util.verifyTable(result, expected) + } + + @Test + def testWindowStartEndSql(): Unit = { — End diff – tests only window end.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3862#discussion_r115723393

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala —
          @@ -114,101 +78,207 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttleImpl

          { "Union fields with time attributes have different types.") }
          • updatedUnion
            + LogicalUnion.create(inputs, union.all)
            }

          + override def visit(aggregate: LogicalAggregate): RelNode = convertAggregate(aggregate)
          +
          + override def visit(minus: LogicalMinus): RelNode =
          + throw new TableException("Logical minus in a stream environment is not supported yet.")
          +
          + override def visit(sort: LogicalSort): RelNode =
          + throw new TableException("Logical sort in a stream environment is not supported yet.")
          +
          + override def visit(`match`: LogicalMatch): RelNode =
          + throw new TableException("Logical match in a stream environment is not supported yet.")
          +
          override def visit(other: RelNode): RelNode = other match {

          • case scan: LogicalTableFunctionScan if
          • stack.size() > 0 && stack.peek().isInstanceOf[LogicalCorrelate] =>
            +
            + case uncollect: Uncollect =>
            // visit children and update inputs
          • val updatedScan = super.visit(scan).asInstanceOf[LogicalTableFunctionScan]
            -
          • val correlate = stack.peek().asInstanceOf[LogicalCorrelate]
            -
          • // check if input field contains time indicator type
          • // materialize field if no time indicator is present anymore
          • // if input field is already materialized, change to timestamp type
          • val materializer = new RexTimeIndicatorMaterializer(
          • rexBuilder,
          • correlate.getInputs.get(0).getRowType.getFieldList.map(_.getType))
          • val newCall = updatedScan.getCall.accept(materializer)
            -
          • // copy scan
          • updatedScan.copy(
          • updatedScan.getTraitSet,
          • updatedScan.getInputs,
          • newCall,
          • updatedScan.getElementType,
          • updatedScan.getRowType,
          • updatedScan.getColumnMappings
          • )
            + val input = uncollect.getInput.accept(this)
            + Uncollect.create(uncollect.getTraitSet, input, uncollect.withOrdinality)
            +
            + case scan: LogicalTableFunctionScan =>
            + scan
            +
            + case aggregate: LogicalWindowAggregate =>
            + val convAggregate = convertAggregate(aggregate)
            +
            + LogicalWindowAggregate.create(
            + aggregate.getWindow,
            + aggregate.getNamedProperties,
            + convAggregate)

          case _ =>

          • super.visit(other)
            + throw new TableException(s"Unsupported logical operator: $ {other.getClass.getSimpleName}

            ")
            + }
            +
            +
            + override def visit(exchange: LogicalExchange): RelNode =
            + throw new TableException("Logical exchange in a stream environment is not supported yet.")
            +
            + override def visit(scan: TableScan): RelNode = scan
            +
            + override def visit(scan: TableFunctionScan): RelNode =
            + throw new TableException("Table function scan in a stream environment is not supported yet.")
            +
            + override def visit(values: LogicalValues): RelNode = values
            +
            + override def visit(filter: LogicalFilter): RelNode =

            { + // visit children and update inputs + val input = filter.getInput.accept(this) + + // check if input field contains time indicator type + // materialize field if no time indicator is present anymore + // if input field is already materialized, change to timestamp type + val materializer = new RexTimeIndicatorMaterializer( + rexBuilder, + input.getRowType.getFieldList.map(_.getType)) + + val condition = filter.getCondition.accept(materializer) + LogicalFilter.create(input, condition) + }

            +
            + override def visit(project: LogicalProject): RelNode =

            { + // visit children and update inputs + val input = project.getInput.accept(this) + + // check if input field contains time indicator type + // materialize field if no time indicator is present anymore + // if input field is already materialized, change to timestamp type + val materializer = new RexTimeIndicatorMaterializer( + rexBuilder, + input.getRowType.getFieldList.map(_.getType)) + + val projects = project.getProjects.map(_.accept(materializer)) + val fieldNames = project.getRowType.getFieldNames + LogicalProject.create(input, projects, fieldNames) }
          • private def buildRowType(names: Seq[String], types: Seq[RelDataType]): RelDataType = {
          • val fields = names.zipWithIndex.map { case (name, idx) =>
          • new RelDataTypeFieldImpl(name, idx, types(idx))
            + override def visit(join: LogicalJoin): RelNode =
            + throw new TableException("Logical join in a stream environment is not supported yet.")
            +
            + override def visit(correlate: LogicalCorrelate): RelNode = {
            + // visit children and update inputs
            + val inputs = correlate.getInputs.map(_.accept(this))
            +
            + val right = inputs(1) match { + case scan: LogicalTableFunctionScan => + // visit children and update inputs + val scanInputs = scan.getInputs.map(_.accept(this)) + + // check if input field contains time indicator type + // materialize field if no time indicator is present anymore + // if input field is already materialized, change to timestamp type + val materializer = new RexTimeIndicatorMaterializer( + rexBuilder, + inputs.head.getRowType.getFieldList.map(_.getType)) + + val call = scan.getCall.accept(materializer) + LogicalTableFunctionScan.create( + scan.getCluster, + scanInputs, + call, + scan.getElementType, + scan.getRowType, + scan.getColumnMappings) + + case _ => + inputs(1) }
          • new RelRecordType(StructKind.FULLY_QUALIFIED, fields)
            +
            + LogicalCorrelate.create(
            + inputs.head,
            + right,
            + correlate.getCorrelationId,
            + correlate.getRequiredColumns,
            + correlate.getJoinType)
            + }
            +
            + private def convertAggregate(aggregate: Aggregate): LogicalAggregate = {
              • End diff –

          Return a `(Option[LogicalProject], LogicalAggregate)` to cover the case when a `LogicalProject` needs to be prepended to materialize a grouping key or agg function argument (see comments below).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3862#discussion_r115723393 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala — @@ -114,101 +78,207 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttleImpl { "Union fields with time attributes have different types.") } updatedUnion + LogicalUnion.create(inputs, union.all) } + override def visit(aggregate: LogicalAggregate): RelNode = convertAggregate(aggregate) + + override def visit(minus: LogicalMinus): RelNode = + throw new TableException("Logical minus in a stream environment is not supported yet.") + + override def visit(sort: LogicalSort): RelNode = + throw new TableException("Logical sort in a stream environment is not supported yet.") + + override def visit(`match`: LogicalMatch): RelNode = + throw new TableException("Logical match in a stream environment is not supported yet.") + override def visit(other: RelNode): RelNode = other match { case scan: LogicalTableFunctionScan if stack.size() > 0 && stack.peek().isInstanceOf [LogicalCorrelate] => + + case uncollect: Uncollect => // visit children and update inputs val updatedScan = super.visit(scan).asInstanceOf [LogicalTableFunctionScan] - val correlate = stack.peek().asInstanceOf [LogicalCorrelate] - // check if input field contains time indicator type // materialize field if no time indicator is present anymore // if input field is already materialized, change to timestamp type val materializer = new RexTimeIndicatorMaterializer( rexBuilder, correlate.getInputs.get(0).getRowType.getFieldList.map(_.getType)) val newCall = updatedScan.getCall.accept(materializer) - // copy scan updatedScan.copy( updatedScan.getTraitSet, updatedScan.getInputs, newCall, updatedScan.getElementType, updatedScan.getRowType, updatedScan.getColumnMappings ) + val input = uncollect.getInput.accept(this) + Uncollect.create(uncollect.getTraitSet, input, uncollect.withOrdinality) + + case scan: LogicalTableFunctionScan => + scan + + case aggregate: LogicalWindowAggregate => + val convAggregate = convertAggregate(aggregate) + + LogicalWindowAggregate.create( + aggregate.getWindow, + aggregate.getNamedProperties, + convAggregate) case _ => super.visit(other) + throw new TableException(s"Unsupported logical operator: $ {other.getClass.getSimpleName} ") + } + + + override def visit(exchange: LogicalExchange): RelNode = + throw new TableException("Logical exchange in a stream environment is not supported yet.") + + override def visit(scan: TableScan): RelNode = scan + + override def visit(scan: TableFunctionScan): RelNode = + throw new TableException("Table function scan in a stream environment is not supported yet.") + + override def visit(values: LogicalValues): RelNode = values + + override def visit(filter: LogicalFilter): RelNode = { + // visit children and update inputs + val input = filter.getInput.accept(this) + + // check if input field contains time indicator type + // materialize field if no time indicator is present anymore + // if input field is already materialized, change to timestamp type + val materializer = new RexTimeIndicatorMaterializer( + rexBuilder, + input.getRowType.getFieldList.map(_.getType)) + + val condition = filter.getCondition.accept(materializer) + LogicalFilter.create(input, condition) + } + + override def visit(project: LogicalProject): RelNode = { + // visit children and update inputs + val input = project.getInput.accept(this) + + // check if input field contains time indicator type + // materialize field if no time indicator is present anymore + // if input field is already materialized, change to timestamp type + val materializer = new RexTimeIndicatorMaterializer( + rexBuilder, + input.getRowType.getFieldList.map(_.getType)) + + val projects = project.getProjects.map(_.accept(materializer)) + val fieldNames = project.getRowType.getFieldNames + LogicalProject.create(input, projects, fieldNames) } private def buildRowType(names: Seq [String] , types: Seq [RelDataType] ): RelDataType = { val fields = names.zipWithIndex.map { case (name, idx) => new RelDataTypeFieldImpl(name, idx, types(idx)) + override def visit(join: LogicalJoin): RelNode = + throw new TableException("Logical join in a stream environment is not supported yet.") + + override def visit(correlate: LogicalCorrelate): RelNode = { + // visit children and update inputs + val inputs = correlate.getInputs.map(_.accept(this)) + + val right = inputs(1) match { + case scan: LogicalTableFunctionScan => + // visit children and update inputs + val scanInputs = scan.getInputs.map(_.accept(this)) + + // check if input field contains time indicator type + // materialize field if no time indicator is present anymore + // if input field is already materialized, change to timestamp type + val materializer = new RexTimeIndicatorMaterializer( + rexBuilder, + inputs.head.getRowType.getFieldList.map(_.getType)) + + val call = scan.getCall.accept(materializer) + LogicalTableFunctionScan.create( + scan.getCluster, + scanInputs, + call, + scan.getElementType, + scan.getRowType, + scan.getColumnMappings) + + case _ => + inputs(1) } new RelRecordType(StructKind.FULLY_QUALIFIED, fields) + + LogicalCorrelate.create( + inputs.head, + right, + correlate.getCorrelationId, + correlate.getRequiredColumns, + correlate.getJoinType) + } + + private def convertAggregate(aggregate: Aggregate): LogicalAggregate = { End diff – Return a `(Option [LogicalProject] , LogicalAggregate)` to cover the case when a `LogicalProject` needs to be prepended to materialize a grouping key or agg function argument (see comments below).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3862#discussion_r115715055

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala —
          @@ -0,0 +1,284 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.calcite
          +
          +import java.sql.Timestamp
          +
          +import org.apache.flink.api.scala._
          +import org.apache.flink.table.api.ValidationException
          +import org.apache.flink.table.api.scala._
          +import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc
          +import org.apache.flink.table.expressions.

          {TimeIntervalUnit, WindowReference}

          +import org.apache.flink.table.functions.TableFunction
          +import org.apache.flink.table.plan.logical.TumblingGroupWindow
          +import org.apache.flink.table.utils.TableTestBase
          +import org.apache.flink.table.utils.TableTestUtil._
          +import org.junit.Test
          +
          +/**
          + * Tests for [[RelTimeIndicatorConverter]].
          + */
          +class RelTimeIndicatorConverterTest extends TableTestBase {
          +
          + @Test
          + def testSimpleMaterialization(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t + .select('rowtime.floor(TimeIntervalUnit.DAY) as 'rowtime, 'long) + .filter('long > 0) + .select('rowtime) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "FLOOR(TIME_MATERIALIZATION(rowtime)", "FLAG(DAY)) AS rowtime"), + term("where", ">(long, 0)") + ) + + util.verifyTable(result, expected) + }

          +
          + @Test
          + def testSelectAll(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t.select('*) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime", "long", "int", + "TIME_MATERIALIZATION(proctime) AS proctime") + ) + + util.verifyTable(result, expected) + }

          +
          + @Test
          + def testFilteringOnRowtime(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .filter('rowtime > "1990-12-02 12:11:11".toTimestamp) + .select('rowtime) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime"), + term("where", ">(TIME_MATERIALIZATION(rowtime), 1990-12-02 12:11:11)") + ) + + util.verifyTable(result, expected) + }

          +
          + @Test(expected = classOf[ValidationException])
          + def testGroupingOnRowtime(): Unit = {
          + val util = streamTestUtil()
          + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime)
          +
          + val result = t
          + .groupBy('rowtime)
          — End diff –

          Can we support this case as well?
          For example `.groupBy('rowtime + 0.milli)` and `.groupBy('rowtime.floor(TimeIntervalUnit.HOUR))` work correctly, because the expressions are pushed into a Calc which evaluates them. So for `.groupBy('rowtime)`, we would need to inject a Calc with the materialization. How much effort would this be?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3862#discussion_r115715055 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala — @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.calcite + +import java.sql.Timestamp + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc +import org.apache.flink.table.expressions. {TimeIntervalUnit, WindowReference} +import org.apache.flink.table.functions.TableFunction +import org.apache.flink.table.plan.logical.TumblingGroupWindow +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Test + +/** + * Tests for [ [RelTimeIndicatorConverter] ]. + */ +class RelTimeIndicatorConverterTest extends TableTestBase { + + @Test + def testSimpleMaterialization(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t + .select('rowtime.floor(TimeIntervalUnit.DAY) as 'rowtime, 'long) + .filter('long > 0) + .select('rowtime) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "FLOOR(TIME_MATERIALIZATION(rowtime)", "FLAG(DAY)) AS rowtime"), + term("where", ">(long, 0)") + ) + + util.verifyTable(result, expected) + } + + @Test + def testSelectAll(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t.select('*) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime", "long", "int", + "TIME_MATERIALIZATION(proctime) AS proctime") + ) + + util.verifyTable(result, expected) + } + + @Test + def testFilteringOnRowtime(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .filter('rowtime > "1990-12-02 12:11:11".toTimestamp) + .select('rowtime) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime"), + term("where", ">(TIME_MATERIALIZATION(rowtime), 1990-12-02 12:11:11)") + ) + + util.verifyTable(result, expected) + } + + @Test(expected = classOf [ValidationException] ) + def testGroupingOnRowtime(): Unit = { + val util = streamTestUtil() + val t = util.addTable [(Long, Long, Int)] ('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t + .groupBy('rowtime) — End diff – Can we support this case as well? For example `.groupBy('rowtime + 0.milli)` and `.groupBy('rowtime.floor(TimeIntervalUnit.HOUR))` work correctly, because the expressions are pushed into a Calc which evaluates them. So for `.groupBy('rowtime)`, we would need to inject a Calc with the materialization. How much effort would this be?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3862#discussion_r115719116

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala —
          @@ -0,0 +1,284 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.calcite
          +
          +import java.sql.Timestamp
          +
          +import org.apache.flink.api.scala._
          +import org.apache.flink.table.api.ValidationException
          +import org.apache.flink.table.api.scala._
          +import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc
          +import org.apache.flink.table.expressions.

          {TimeIntervalUnit, WindowReference}

          +import org.apache.flink.table.functions.TableFunction
          +import org.apache.flink.table.plan.logical.TumblingGroupWindow
          +import org.apache.flink.table.utils.TableTestBase
          +import org.apache.flink.table.utils.TableTestUtil._
          +import org.junit.Test
          +
          +/**
          + * Tests for [[RelTimeIndicatorConverter]].
          + */
          +class RelTimeIndicatorConverterTest extends TableTestBase {
          +
          + @Test
          + def testSimpleMaterialization(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t + .select('rowtime.floor(TimeIntervalUnit.DAY) as 'rowtime, 'long) + .filter('long > 0) + .select('rowtime) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "FLOOR(TIME_MATERIALIZATION(rowtime)", "FLAG(DAY)) AS rowtime"), + term("where", ">(long, 0)") + ) + + util.verifyTable(result, expected) + }

          +
          + @Test
          + def testSelectAll(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t.select('*) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime", "long", "int", + "TIME_MATERIALIZATION(proctime) AS proctime") + ) + + util.verifyTable(result, expected) + }

          +
          + @Test
          + def testFilteringOnRowtime(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .filter('rowtime > "1990-12-02 12:11:11".toTimestamp) + .select('rowtime) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime"), + term("where", ">(TIME_MATERIALIZATION(rowtime), 1990-12-02 12:11:11)") + ) + + util.verifyTable(result, expected) + }

          +
          + @Test(expected = classOf[ValidationException])
          + def testGroupingOnRowtime(): Unit =

          { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t + .groupBy('rowtime) + .select('long.count) + + util.verifyTable(result, "FAIL") + }

          +
          + @Test(expected = classOf[ValidationException])
          + def testGroupingOnProctimeSql(): Unit =

          { + val util = streamTestUtil() + util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime) + + val result = util.tEnv.sql("SELECT COUNT(long) FROM MyTable GROUP BY proctime") + + util.verifyTable(result, "FAIL") + }

          +
          + @Test(expected = classOf[ValidationException])
          + def testAggregationOnRowtime(): Unit = {
          + val util = streamTestUtil()
          + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int)
          +
          + val result = t
          + .groupBy('long)
          + .select('rowtime.count)
          — End diff –

          Not sure if it makes sense to have this restriction here. `.select(('rowtime + 0.milli).count)` would work correctly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3862#discussion_r115719116 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala — @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.calcite + +import java.sql.Timestamp + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc +import org.apache.flink.table.expressions. {TimeIntervalUnit, WindowReference} +import org.apache.flink.table.functions.TableFunction +import org.apache.flink.table.plan.logical.TumblingGroupWindow +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Test + +/** + * Tests for [ [RelTimeIndicatorConverter] ]. + */ +class RelTimeIndicatorConverterTest extends TableTestBase { + + @Test + def testSimpleMaterialization(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t + .select('rowtime.floor(TimeIntervalUnit.DAY) as 'rowtime, 'long) + .filter('long > 0) + .select('rowtime) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "FLOOR(TIME_MATERIALIZATION(rowtime)", "FLAG(DAY)) AS rowtime"), + term("where", ">(long, 0)") + ) + + util.verifyTable(result, expected) + } + + @Test + def testSelectAll(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t.select('*) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime", "long", "int", + "TIME_MATERIALIZATION(proctime) AS proctime") + ) + + util.verifyTable(result, expected) + } + + @Test + def testFilteringOnRowtime(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int) + + val result = t + .filter('rowtime > "1990-12-02 12:11:11".toTimestamp) + .select('rowtime) + + val expected = unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime"), + term("where", ">(TIME_MATERIALIZATION(rowtime), 1990-12-02 12:11:11)") + ) + + util.verifyTable(result, expected) + } + + @Test(expected = classOf [ValidationException] ) + def testGroupingOnRowtime(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int, 'proctime.proctime) + + val result = t + .groupBy('rowtime) + .select('long.count) + + util.verifyTable(result, "FAIL") + } + + @Test(expected = classOf [ValidationException] ) + def testGroupingOnProctimeSql(): Unit = { + val util = streamTestUtil() + util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime) + + val result = util.tEnv.sql("SELECT COUNT(long) FROM MyTable GROUP BY proctime") + + util.verifyTable(result, "FAIL") + } + + @Test(expected = classOf [ValidationException] ) + def testAggregationOnRowtime(): Unit = { + val util = streamTestUtil() + val t = util.addTable [(Long, Long, Int)] ('rowtime.rowtime, 'long, 'int) + + val result = t + .groupBy('long) + .select('rowtime.count) — End diff – Not sure if it makes sense to have this restriction here. `.select(('rowtime + 0.milli).count)` would work correctly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3862#discussion_r115721434

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala —
          @@ -0,0 +1,210 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.runtime.datastream
          +
          +import java.math.BigDecimal
          +import java.sql.Timestamp
          +
          +import org.apache.flink.api.scala._
          +import org.apache.flink.streaming.api.TimeCharacteristic
          +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
          +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
          +import org.apache.flink.streaming.api.watermark.Watermark
          +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
          +import org.apache.flink.table.api.

          {TableEnvironment, Types, ValidationException}

          +import org.apache.flink.table.api.scala._
          +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
          +import org.apache.flink.table.api.scala.stream.utils.StreamITCase
          +import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc
          +import org.apache.flink.table.expressions.

          {TimeIntervalUnit, WindowReference}

          +import org.apache.flink.table.functions.TableFunction
          +import org.apache.flink.table.plan.logical.TumblingGroupWindow
          +import org.apache.flink.table.runtime.datastream.TimeAttributesITCase.TimestampWithEqualWatermark
          +import org.apache.flink.table.utils.TableTestBase
          +import org.apache.flink.table.utils.TableTestUtil._
          +import org.apache.flink.types.Row
          +import org.junit.Assert._
          +import org.junit.Test
          +
          +import scala.collection.mutable
          +
          +/**
          + * Tests for access and materialization of time attributes.
          + */
          +class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
          +
          + val data = List(
          + (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),
          + (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"),
          + (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"),
          + (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"),
          + (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
          + (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"),
          + (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"))
          +
          + @Test
          + def testCalcMaterialization(): Unit =

          { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + + val t = table.select('rowtime.cast(Types.STRING)) + + val results = t.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.001", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.003", + "1970-01-01 00:00:00.004", + "1970-01-01 00:00:00.007", + "1970-01-01 00:00:00.008", + "1970-01-01 00:00:00.016") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          + @Test
          + def testCalcMaterialization2(): Unit =

          { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + + val t = table + .filter('rowtime.cast(Types.LONG) > 4) + .select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 'rowtime.ceil(TimeIntervalUnit.DAY)) + + val results = t.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.007,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0", + "1970-01-01 00:00:00.008,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0", + "1970-01-01 00:00:00.016,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          + @Test
          + def testTableFunction(): Unit =

          { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable( + tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'proctime.proctime) + val func = new TableFunc + + val t = table.join(func('rowtime, 'proctime) as 's).select('rowtime, 's) + + val results = t.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.001,1true", + "1970-01-01 00:00:00.002,2true", + "1970-01-01 00:00:00.003,3true", + "1970-01-01 00:00:00.004,4true", + "1970-01-01 00:00:00.007,7true", + "1970-01-01 00:00:00.008,8true", + "1970-01-01 00:00:00.016,16true") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          + @Test
          + def testUnion(): Unit =

          { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable( + tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + + val t = table.unionAll(table).select('rowtime) + + val results = t.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.001", + "1970-01-01 00:00:00.001", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.003", + "1970-01-01 00:00:00.003", + "1970-01-01 00:00:00.004", + "1970-01-01 00:00:00.004", + "1970-01-01 00:00:00.007", + "1970-01-01 00:00:00.007", + "1970-01-01 00:00:00.008", + "1970-01-01 00:00:00.008", + "1970-01-01 00:00:00.016", + "1970-01-01 00:00:00.016") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          +}
          +
          +object TimeAttributesITCase {
          + class TimestampWithEqualWatermark
          + extends AssignerWithPunctuatedWatermarks[(Long, Int, Double, Float, BigDecimal, String)] {
          +
          + override def checkAndGetNextWatermark(
          + lastElement: (Long, Int, Double, Float, BigDecimal, String),
          + extractedTimestamp: Long)
          + : Watermark =

          { + new Watermark(extractedTimestamp) + }

          +
          + override def extractTimestamp(
          + element: (Long, Int, Double, Float, BigDecimal, String),
          + previousElementTimestamp: Long): Long =

          { + element._1 + }

          + }
          +
          + class TableFunc extends TableFunction[String] {
          — End diff –

          Can be removed because the code uses `RelTimeIndicatorConverterTest.TableFunc`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3862#discussion_r115721434 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala — @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.datastream + +import java.math.BigDecimal +import java.sql.Timestamp + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.table.api. {TableEnvironment, Types, ValidationException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase +import org.apache.flink.table.api.scala.stream.utils.StreamITCase +import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc +import org.apache.flink.table.expressions. {TimeIntervalUnit, WindowReference} +import org.apache.flink.table.functions.TableFunction +import org.apache.flink.table.plan.logical.TumblingGroupWindow +import org.apache.flink.table.runtime.datastream.TimeAttributesITCase.TimestampWithEqualWatermark +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil._ +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +/** + * Tests for access and materialization of time attributes. + */ +class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { + + val data = List( + (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"), + (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"), + (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"), + (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"), + (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"), + (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"), + (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world")) + + @Test + def testCalcMaterialization(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + + val t = table.select('rowtime.cast(Types.STRING)) + + val results = t.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.001", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.003", + "1970-01-01 00:00:00.004", + "1970-01-01 00:00:00.007", + "1970-01-01 00:00:00.008", + "1970-01-01 00:00:00.016") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testCalcMaterialization2(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + + val t = table + .filter('rowtime.cast(Types.LONG) > 4) + .select('rowtime, 'rowtime.floor(TimeIntervalUnit.DAY), 'rowtime.ceil(TimeIntervalUnit.DAY)) + + val results = t.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.007,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0", + "1970-01-01 00:00:00.008,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0", + "1970-01-01 00:00:00.016,1970-01-01 00:00:00.0,1970-01-02 00:00:00.0") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testTableFunction(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable( + tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'proctime.proctime) + val func = new TableFunc + + val t = table.join(func('rowtime, 'proctime) as 's).select('rowtime, 's) + + val results = t.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.001,1true", + "1970-01-01 00:00:00.002,2true", + "1970-01-01 00:00:00.003,3true", + "1970-01-01 00:00:00.004,4true", + "1970-01-01 00:00:00.007,7true", + "1970-01-01 00:00:00.008,8true", + "1970-01-01 00:00:00.016,16true") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testUnion(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + val table = stream.toTable( + tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + + val t = table.unionAll(table).select('rowtime) + + val results = t.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.001", + "1970-01-01 00:00:00.001", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.003", + "1970-01-01 00:00:00.003", + "1970-01-01 00:00:00.004", + "1970-01-01 00:00:00.004", + "1970-01-01 00:00:00.007", + "1970-01-01 00:00:00.007", + "1970-01-01 00:00:00.008", + "1970-01-01 00:00:00.008", + "1970-01-01 00:00:00.016", + "1970-01-01 00:00:00.016") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + +} + +object TimeAttributesITCase { + class TimestampWithEqualWatermark + extends AssignerWithPunctuatedWatermarks [(Long, Int, Double, Float, BigDecimal, String)] { + + override def checkAndGetNextWatermark( + lastElement: (Long, Int, Double, Float, BigDecimal, String), + extractedTimestamp: Long) + : Watermark = { + new Watermark(extractedTimestamp) + } + + override def extractTimestamp( + element: (Long, Int, Double, Float, BigDecimal, String), + previousElementTimestamp: Long): Long = { + element._1 + } + } + + class TableFunc extends TableFunction [String] { — End diff – Can be removed because the code uses `RelTimeIndicatorConverterTest.TableFunc`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3862

          Thanks for the update @twalthr!
          Looks very good. Will merge this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3862 Thanks for the update @twalthr! Looks very good. Will merge this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3862

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3862
          Hide
          fhueske Fabian Hueske added a comment -

          Fixed for 1.3 with 2c65843fc9be41771e8a2dc32de3be30fd641260
          Fixed for 1.4 with b50ef4b8de73e0e19df154d87ea588236e3ccb43

          Show
          fhueske Fabian Hueske added a comment - Fixed for 1.3 with 2c65843fc9be41771e8a2dc32de3be30fd641260 Fixed for 1.4 with b50ef4b8de73e0e19df154d87ea588236e3ccb43

            People

            • Assignee:
              twalthr Timo Walther
              Reporter:
              twalthr Timo Walther
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development