Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5710

Add ProcTime() function to indicate StreamSQL

    Details

    • Type: New Feature
    • Status: Closed
    • Priority: Minor
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: DataStream API
    • Labels:
      None

      Description

      procTime() is a parameterless scalar function that just indicates processing time mode

        Issue Links

          Activity

          Hide
          fhueske Fabian Hueske added a comment -

          We have to think a bit more about this function. In join queries we also need a reference to the table. Not sure if a dummy function is the best solution...

          Show
          fhueske Fabian Hueske added a comment - We have to think a bit more about this function. In join queries we also need a reference to the table. Not sure if a dummy function is the best solution...
          Hide
          stefano.bortoli Stefano Bortoli added a comment -

          Sure. I could trigger the first PR today with what I have, and then start discussing around it. What do you think? or should I wait for/produce more detailed specifications?

          Show
          stefano.bortoli Stefano Bortoli added a comment - Sure. I could trigger the first PR today with what I have, and then start discussing around it. What do you think? or should I wait for/produce more detailed specifications?
          Hide
          fhueske Fabian Hueske added a comment -

          That sounds good. Let's start with a simple "marker function" to not block the development of all windows.
          We need to come up with a better design and replace that later, i.e., before the next release in 4 months.

          Show
          fhueske Fabian Hueske added a comment - That sounds good. Let's start with a simple "marker function" to not block the development of all windows. We need to come up with a better design and replace that later, i.e., before the next release in 4 months.
          Hide
          stefano.bortoli Stefano Bortoli added a comment -

          Just to track this, what you had in mind is something related to the time granularity of the timestamp? e.g. procTime(N * TIMEUNIT.SECOND/MILLISECOND) and then the function returns a normalized processing time to support the join beyond mere coincidence?

          Show
          stefano.bortoli Stefano Bortoli added a comment - Just to track this, what you had in mind is something related to the time granularity of the timestamp? e.g. procTime(N * TIMEUNIT.SECOND/MILLISECOND) and then the function returns a normalized processing time to support the join beyond mere coincidence?
          Hide
          fhueske Fabian Hueske added a comment -

          Actually, the function should not return anything. It is just a marker to indicate processing time semantics and would be removed during query translation.

          Show
          fhueske Fabian Hueske added a comment - Actually, the function should not return anything. It is just a marker to indicate processing time semantics and would be removed during query translation.
          Hide
          wheat9 Haohui Mai added a comment - - edited

          Can you please elaborate a little bit more, particularly how to generate a user friendly message? For example:

          SELECT FLOOR(procTime() TO HOURS) as ts, COUNT(*) FROM foo GROUP BY FLOOR(procTime() TO HOURS)
          

          The query translation can recognize certain usages of procTime() (e.g., FLOOR) but other usages might result in mysterious error messages (i.e., cannot do codegen for the procTime() function). Any ideas?

          Show
          wheat9 Haohui Mai added a comment - - edited Can you please elaborate a little bit more, particularly how to generate a user friendly message? For example: SELECT FLOOR(procTime() TO HOURS) as ts, COUNT(*) FROM foo GROUP BY FLOOR(procTime() TO HOURS) The query translation can recognize certain usages of procTime() (e.g., FLOOR ) but other usages might result in mysterious error messages (i.e., cannot do codegen for the procTime() function). Any ideas?
          Hide
          fhueske Fabian Hueske added a comment -

          The idea is to kind of apply pattern matching for certain expressions such as GROUP BY FLOOR(procTime() TO HOURS).
          The challenge is that this expression is spread across several LogicalRel nodes.
          The expression FLOOR(procTime() TO HOURS) will be moved into a LogicalProject (or LogicalCalc) to create a new attribute. A following LogicalAggregate will then use that attribute as a grouping column.

          Once we detect such a pattern, we have to rewrite the plan and replace the LogicalAggregate and parts of the LogicalProject by a LogicalWindowAggregate. The LogicalWindowAggregate includes a window definition. Depending on which marker function is used (rowtime or proctime) the window definition is either for a processing or an event time window. After the translation, the function is no longer available.

          With this approach, we can only translate very specific queries. However, I don't think we can easily provide a generic translation for SQL window queries.

          Show
          fhueske Fabian Hueske added a comment - The idea is to kind of apply pattern matching for certain expressions such as GROUP BY FLOOR(procTime() TO HOURS) . The challenge is that this expression is spread across several LogicalRel nodes. The expression FLOOR(procTime() TO HOURS) will be moved into a LogicalProject (or LogicalCalc ) to create a new attribute. A following LogicalAggregate will then use that attribute as a grouping column. Once we detect such a pattern, we have to rewrite the plan and replace the LogicalAggregate and parts of the LogicalProject by a LogicalWindowAggregate . The LogicalWindowAggregate includes a window definition. Depending on which marker function is used ( rowtime or proctime ) the window definition is either for a processing or an event time window. After the translation, the function is no longer available. With this approach, we can only translate very specific queries. However, I don't think we can easily provide a generic translation for SQL window queries.
          Hide
          wheat9 Haohui Mai added a comment -

          Just FYI: FLINK-5624 implements the rowTime() function. The procTime() can probably done in a similar way.

          Show
          wheat9 Haohui Mai added a comment - Just FYI: FLINK-5624 implements the rowTime() function. The procTime() can probably done in a similar way.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user huawei-flink commented on the issue:

          https://github.com/apache/flink/pull/3271

          @fhueske not sure this was noticed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3271 @fhueske not sure this was noticed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tonycox commented on the issue:

          https://github.com/apache/flink/pull/3271

          Hi @huawei-flink,
          please name your PR with "[FLINK-XXX] Jira title text" pattern

          Show
          githubbot ASF GitHub Bot added a comment - Github user tonycox commented on the issue: https://github.com/apache/flink/pull/3271 Hi @huawei-flink, please name your PR with " [FLINK-XXX] Jira title text" pattern
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3271#discussion_r100804906

          — Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/functions/FlinkStreamFunctionCatalog.java —
          @@ -0,0 +1,39 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to you under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.calcite.functions;
          +
          +import org.apache.calcite.sql.SqlFunction;
          +import org.apache.calcite.sql.SqlFunctionCategory;
          +import org.apache.calcite.sql.SqlKind;
          +import org.apache.calcite.sql.type.OperandTypes;
          +import org.apache.calcite.sql.type.ReturnTypes;
          +import org.apache.calcite.sql.type.SqlReturnTypeInference;
          +import org.apache.calcite.sql.type.SqlTypeName;
          +
          +public class FlinkStreamFunctionCatalog {
          — End diff –

          Please implement this class in Scala.
          We don't want to unnecessarily mix Java and Scala in the same project.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3271#discussion_r100804906 — Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/functions/FlinkStreamFunctionCatalog.java — @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.calcite.functions; + +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.apache.calcite.sql.type.SqlTypeName; + +public class FlinkStreamFunctionCatalog { — End diff – Please implement this class in Scala. We don't want to unnecessarily mix Java and Scala in the same project.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3271#discussion_r100804434

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ProcTimeCallGen.scala —
          @@ -0,0 +1,40 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.codegen.calls
          +
          +import org.apache.flink.api.common.typeinfo.

          {SqlTimeTypeInfo, TypeInformation}

          +import org.apache.flink.table.codegen.

          {CodeGenerator, GeneratedExpression}

          +
          +/**
          + * Generates function call to determine current time point (as date/time/timestamp) in
          — End diff –

          Please update the documentation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3271#discussion_r100804434 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ProcTimeCallGen.scala — @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen.calls + +import org.apache.flink.api.common.typeinfo. {SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.table.codegen. {CodeGenerator, GeneratedExpression} + +/** + * Generates function call to determine current time point (as date/time/timestamp) in — End diff – Please update the documentation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3271#discussion_r100806301

          — Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/functions/FlinkStreamFunctionCatalog.java —
          @@ -0,0 +1,39 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to you under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.calcite.functions;
          +
          +import org.apache.calcite.sql.SqlFunction;
          +import org.apache.calcite.sql.SqlFunctionCategory;
          +import org.apache.calcite.sql.SqlKind;
          +import org.apache.calcite.sql.type.OperandTypes;
          +import org.apache.calcite.sql.type.ReturnTypes;
          +import org.apache.calcite.sql.type.SqlReturnTypeInference;
          +import org.apache.calcite.sql.type.SqlTypeName;
          +
          +public class FlinkStreamFunctionCatalog {
          +
          + /**
          + * An explicit representation of TIMESTAMP as an SQL return type
          + */
          + private static final SqlReturnTypeInference TIMESTAMP = ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 0);
          +
          + /**
          + * A a parameterless scalar function that just indicates processing time mode.
          — End diff –

          Typo "A a parameterless"

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3271#discussion_r100806301 — Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/functions/FlinkStreamFunctionCatalog.java — @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.calcite.functions; + +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.apache.calcite.sql.type.SqlTypeName; + +public class FlinkStreamFunctionCatalog { + + /** + * An explicit representation of TIMESTAMP as an SQL return type + */ + private static final SqlReturnTypeInference TIMESTAMP = ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 0); + + /** + * A a parameterless scalar function that just indicates processing time mode. — End diff – Typo "A a parameterless"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3271#discussion_r100806529

          — Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/functions/FlinkStreamFunctionCatalog.java —
          @@ -0,0 +1,39 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to you under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.calcite.functions;
          +
          +import org.apache.calcite.sql.SqlFunction;
          +import org.apache.calcite.sql.SqlFunctionCategory;
          +import org.apache.calcite.sql.SqlKind;
          +import org.apache.calcite.sql.type.OperandTypes;
          +import org.apache.calcite.sql.type.ReturnTypes;
          +import org.apache.calcite.sql.type.SqlReturnTypeInference;
          +import org.apache.calcite.sql.type.SqlTypeName;
          +
          +public class FlinkStreamFunctionCatalog {
          — End diff –

          Rename class to `TimeModeIndicatorFunctions`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3271#discussion_r100806529 — Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/functions/FlinkStreamFunctionCatalog.java — @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.calcite.functions; + +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.apache.calcite.sql.type.SqlTypeName; + +public class FlinkStreamFunctionCatalog { — End diff – Rename class to `TimeModeIndicatorFunctions`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3271#discussion_r100806237

          — Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/functions/FlinkStreamFunctionCatalog.java —
          @@ -0,0 +1,39 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to you under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.calcite.functions;
          +
          +import org.apache.calcite.sql.SqlFunction;
          +import org.apache.calcite.sql.SqlFunctionCategory;
          +import org.apache.calcite.sql.SqlKind;
          +import org.apache.calcite.sql.type.OperandTypes;
          +import org.apache.calcite.sql.type.ReturnTypes;
          +import org.apache.calcite.sql.type.SqlReturnTypeInference;
          +import org.apache.calcite.sql.type.SqlTypeName;
          +
          +public class FlinkStreamFunctionCatalog {
          +
          + /**
          + * An explicit representation of TIMESTAMP as an SQL return type
          + */
          + private static final SqlReturnTypeInference TIMESTAMP = ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 0);
          — End diff –

          I think we can inline this type.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3271#discussion_r100806237 — Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/functions/FlinkStreamFunctionCatalog.java — @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.calcite.functions; + +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.apache.calcite.sql.type.SqlTypeName; + +public class FlinkStreamFunctionCatalog { + + /** + * An explicit representation of TIMESTAMP as an SQL return type + */ + private static final SqlReturnTypeInference TIMESTAMP = ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 0); — End diff – I think we can inline this type.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3271#discussion_r100807508

          — Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/StreamAggregationSqlITCase.java —
          @@ -0,0 +1,81 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.api.java.stream.sql;
          +
          +import org.apache.flink.table.api.java.StreamTableEnvironment;
          +import org.apache.flink.api.java.tuple.Tuple5;
          +import org.apache.flink.types.Row;
          +import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
          +import org.apache.flink.table.api.Table;
          +import org.apache.flink.table.api.TableEnvironment;
          +import org.apache.flink.streaming.api.datastream.DataStream;
          +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
          +import org.apache.flink.table.api.java.stream.utils.StreamTestData;
          +import org.junit.Ignore;
          +import org.junit.Test;
          +
          +import java.util.ArrayList;
          +import java.util.List;
          +
          +public class StreamAggregationSqlITCase extends StreamingMultipleProgramsTestBase {
          — End diff –

          We test functions and expressions not with integration tests (too time intensive) but by extending the `ExpressionTestBase` class. We could add the tests for `procTime` to the `TemporalTypesTest` class.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3271#discussion_r100807508 — Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/StreamAggregationSqlITCase.java — @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.java.stream.sql; + +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.types.Row; +import org.apache.flink.table.api.scala.stream.utils.StreamITCase; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.table.api.java.stream.utils.StreamTestData; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class StreamAggregationSqlITCase extends StreamingMultipleProgramsTestBase { — End diff – We test functions and expressions not with integration tests (too time intensive) but by extending the `ExpressionTestBase` class. We could add the tests for `procTime` to the `TemporalTypesTest` class.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3271#discussion_r100819100

          — Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/functions/FlinkStreamFunctionCatalog.java —
          @@ -0,0 +1,39 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to you under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.calcite.functions;
          +
          +import org.apache.calcite.sql.SqlFunction;
          +import org.apache.calcite.sql.SqlFunctionCategory;
          +import org.apache.calcite.sql.SqlKind;
          +import org.apache.calcite.sql.type.OperandTypes;
          +import org.apache.calcite.sql.type.ReturnTypes;
          +import org.apache.calcite.sql.type.SqlReturnTypeInference;
          +import org.apache.calcite.sql.type.SqlTypeName;
          +
          +public class FlinkStreamFunctionCatalog {
          +
          + /**
          + * An explicit representation of TIMESTAMP as an SQL return type
          + */
          + private static final SqlReturnTypeInference TIMESTAMP = ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 0);
          +
          + /**
          + * A a parameterless scalar function that just indicates processing time mode.
          + */
          + public static final SqlFunction PROCTIME = new SqlFunction("PROCTIME", SqlKind.OTHER_FUNCTION, TIMESTAMP, null, OperandTypes.NILADIC, SqlFunctionCategory.TIMEDATE);
          — End diff –

          The SQL function should also override ``getMonotonicity()`` to indicate that the returned timestamp is increasing

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3271#discussion_r100819100 — Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/functions/FlinkStreamFunctionCatalog.java — @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.calcite.functions; + +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.apache.calcite.sql.type.SqlTypeName; + +public class FlinkStreamFunctionCatalog { + + /** + * An explicit representation of TIMESTAMP as an SQL return type + */ + private static final SqlReturnTypeInference TIMESTAMP = ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 0); + + /** + * A a parameterless scalar function that just indicates processing time mode. + */ + public static final SqlFunction PROCTIME = new SqlFunction("PROCTIME", SqlKind.OTHER_FUNCTION, TIMESTAMP, null, OperandTypes.NILADIC, SqlFunctionCategory.TIMEDATE); — End diff – The SQL function should also override ``getMonotonicity()`` to indicate that the returned timestamp is increasing
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3271

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3271
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3271

          @huawei-flink I'm very sorry, I accidentally closed this PR instead of #3277. Can you reopen it?

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3271 @huawei-flink I'm very sorry, I accidentally closed this PR instead of #3277. Can you reopen it?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3271#discussion_r100844814

          — Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/functions/FlinkStreamFunctionCatalog.java —
          @@ -0,0 +1,39 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to you under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.calcite.functions;
          +
          +import org.apache.calcite.sql.SqlFunction;
          +import org.apache.calcite.sql.SqlFunctionCategory;
          +import org.apache.calcite.sql.SqlKind;
          +import org.apache.calcite.sql.type.OperandTypes;
          +import org.apache.calcite.sql.type.ReturnTypes;
          +import org.apache.calcite.sql.type.SqlReturnTypeInference;
          +import org.apache.calcite.sql.type.SqlTypeName;
          +
          +public class FlinkStreamFunctionCatalog {
          +
          + /**
          + * An explicit representation of TIMESTAMP as an SQL return type
          + */
          + private static final SqlReturnTypeInference TIMESTAMP = ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 0);
          +
          + /**
          + * A a parameterless scalar function that just indicates processing time mode.
          + */
          + public static final SqlFunction PROCTIME = new SqlFunction("PROCTIME", SqlKind.OTHER_FUNCTION, TIMESTAMP, null, OperandTypes.NILADIC, SqlFunctionCategory.TIMEDATE);
          — End diff –

          Have a look at the `EveintTimeExtractor` object of PR #3252.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3271#discussion_r100844814 — Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/functions/FlinkStreamFunctionCatalog.java — @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.calcite.functions; + +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.apache.calcite.sql.type.SqlTypeName; + +public class FlinkStreamFunctionCatalog { + + /** + * An explicit representation of TIMESTAMP as an SQL return type + */ + private static final SqlReturnTypeInference TIMESTAMP = ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 0); + + /** + * A a parameterless scalar function that just indicates processing time mode. + */ + public static final SqlFunction PROCTIME = new SqlFunction("PROCTIME", SqlKind.OTHER_FUNCTION, TIMESTAMP, null, OperandTypes.NILADIC, SqlFunctionCategory.TIMEDATE); — End diff – Have a look at the `EveintTimeExtractor` object of PR #3252.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3271#discussion_r100849148

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala —
          @@ -190,11 +192,14 @@ object FunctionCatalog {
          // array
          "cardinality" -> classOf[ArrayCardinality],
          "at" -> classOf[ArrayElementAt],

          • "element" -> classOf[ArrayElement]
            + "element" -> classOf[ArrayElement],

          + "procTime" -> classOf[CurrentTimestamp]
          — End diff –

          make `procTime` lowercase, i.e., `proctime` to keep it consistent with `rowtime`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3271#discussion_r100849148 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala — @@ -190,11 +192,14 @@ object FunctionCatalog { // array "cardinality" -> classOf [ArrayCardinality] , "at" -> classOf [ArrayElementAt] , "element" -> classOf [ArrayElement] + "element" -> classOf [ArrayElement] , + "procTime" -> classOf [CurrentTimestamp] — End diff – make `procTime` lowercase, i.e., `proctime` to keep it consistent with `rowtime`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user huawei-flink opened a pull request:

          https://github.com/apache/flink/pull/3302

          FLINK-5710 Add ProcTime() function to indicate StreamSQL

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/huawei-flink/flink FLINK-5710

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3302.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3302



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user huawei-flink opened a pull request: https://github.com/apache/flink/pull/3302 FLINK-5710 Add ProcTime() function to indicate StreamSQL Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/huawei-flink/flink FLINK-5710 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3302.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3302
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3302

          Thanks for reopening the PR!
          I made a few comments on #3271 before and after it was closed that should be addressed.

          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3302 Thanks for reopening the PR! I made a few comments on #3271 before and after it was closed that should be addressed. Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user huawei-flink commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3271#discussion_r101454598

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ProcTimeCallGen.scala —
          @@ -0,0 +1,40 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.table.codegen.calls
          +
          +import org.apache.flink.api.common.typeinfo.

          {SqlTimeTypeInfo, TypeInformation}

          +import org.apache.flink.table.codegen.

          {CodeGenerator, GeneratedExpression}

          +
          +/**
          + * Generates function call to determine current time point (as date/time/timestamp) in
          — End diff –

          not sure I understand this point. Should the generate method throw an exception, or should the code for the exception be generated?

          Show
          githubbot ASF GitHub Bot added a comment - Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3271#discussion_r101454598 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ProcTimeCallGen.scala — @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen.calls + +import org.apache.flink.api.common.typeinfo. {SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.table.codegen. {CodeGenerator, GeneratedExpression} + +/** + * Generates function call to determine current time point (as date/time/timestamp) in — End diff – not sure I understand this point. Should the generate method throw an exception, or should the code for the exception be generated?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user huawei-flink commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3271#discussion_r101455204

          — Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/functions/FlinkStreamFunctionCatalog.java —
          @@ -0,0 +1,39 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to you under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.calcite.functions;
          +
          +import org.apache.calcite.sql.SqlFunction;
          +import org.apache.calcite.sql.SqlFunctionCategory;
          +import org.apache.calcite.sql.SqlKind;
          +import org.apache.calcite.sql.type.OperandTypes;
          +import org.apache.calcite.sql.type.ReturnTypes;
          +import org.apache.calcite.sql.type.SqlReturnTypeInference;
          +import org.apache.calcite.sql.type.SqlTypeName;
          +
          +public class FlinkStreamFunctionCatalog {
          +
          + /**
          + * An explicit representation of TIMESTAMP as an SQL return type
          + */
          + private static final SqlReturnTypeInference TIMESTAMP = ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 0);
          +
          + /**
          + * A a parameterless scalar function that just indicates processing time mode.
          + */
          + public static final SqlFunction PROCTIME = new SqlFunction("PROCTIME", SqlKind.OTHER_FUNCTION, TIMESTAMP, null, OperandTypes.NILADIC, SqlFunctionCategory.TIMEDATE);
          — End diff –

          should I simply create the same class with using PROCTIME in place of ROWTIME?

          Show
          githubbot ASF GitHub Bot added a comment - Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3271#discussion_r101455204 — Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/functions/FlinkStreamFunctionCatalog.java — @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.calcite.functions; + +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.apache.calcite.sql.type.SqlTypeName; + +public class FlinkStreamFunctionCatalog { + + /** + * An explicit representation of TIMESTAMP as an SQL return type + */ + private static final SqlReturnTypeInference TIMESTAMP = ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 0); + + /** + * A a parameterless scalar function that just indicates processing time mode. + */ + public static final SqlFunction PROCTIME = new SqlFunction("PROCTIME", SqlKind.OTHER_FUNCTION, TIMESTAMP, null, OperandTypes.NILADIC, SqlFunctionCategory.TIMEDATE); — End diff – should I simply create the same class with using PROCTIME in place of ROWTIME?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user huawei-flink commented on the issue:

          https://github.com/apache/flink/pull/3302

          @fhueske I've addressed most of the points, however there is a thing that is not clear to me yet. So far, the procTime() function generates a timestamp. My understanding is that this is not correct, and it should be something else. could it be a default timestamp (e.g. epoch)? the actual timestamp normalized to the second? what is the best option in your opinion?

          Show
          githubbot ASF GitHub Bot added a comment - Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3302 @fhueske I've addressed most of the points, however there is a thing that is not clear to me yet. So far, the procTime() function generates a timestamp. My understanding is that this is not correct, and it should be something else. could it be a default timestamp (e.g. epoch)? the actual timestamp normalized to the second? what is the best option in your opinion?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3302

          Hi @huawei-flink,

          thanks for the update! The changes are now pretty much aligned with #3252. I'm in the process of merging #3252 (running last tests). It would be great if you could rebase your changes on top of the master once #3252 has been merged.

          In order to test the feature, you can integrate `proctime()` into the `LogicalWindowAggregateRule` and extend the `WindowAggregateTest` for processing time tumbling windows.

          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3302 Hi @huawei-flink, thanks for the update! The changes are now pretty much aligned with #3252. I'm in the process of merging #3252 (running last tests). It would be great if you could rebase your changes on top of the master once #3252 has been merged. In order to test the feature, you can integrate `proctime()` into the `LogicalWindowAggregateRule` and extend the `WindowAggregateTest` for processing time tumbling windows. Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3302

          FYI: PR #3252 was just merged.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3302 FYI: PR #3252 was just merged.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user haohui opened a pull request:

          https://github.com/apache/flink/pull/3370

          FLINK-5710 Add ProcTime() function to indicate StreamSQL.

          This is the commit we used internally – There is no unit tests associated with this PR. It simply serves as a reference point for #3302.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/haohui/flink FLINK-5710

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3370.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3370


          commit 7aaa5008c7b49ce48e01d40dc4a04a6211eaf79b
          Author: Haohui Mai <wheat9@apache.org>
          Date: 2017-02-20T21:13:58Z

          FLINK-5710 Add ProcTime() function to indicate StreamSQL.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user haohui opened a pull request: https://github.com/apache/flink/pull/3370 FLINK-5710 Add ProcTime() function to indicate StreamSQL. This is the commit we used internally – There is no unit tests associated with this PR. It simply serves as a reference point for #3302. You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-5710 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3370.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3370 commit 7aaa5008c7b49ce48e01d40dc4a04a6211eaf79b Author: Haohui Mai <wheat9@apache.org> Date: 2017-02-20T21:13:58Z FLINK-5710 Add ProcTime() function to indicate StreamSQL.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on the issue:

          https://github.com/apache/flink/pull/3302

          FYI: #3370 is the commit we use internally for this feature. Please feel free to take it if it helps implementing this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on the issue: https://github.com/apache/flink/pull/3302 FYI: #3370 is the commit we use internally for this feature. Please feel free to take it if it helps implementing this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3302

          Hi @huawei-flink, do you plan to follow up on this PR?
          Otherwise, I'd merge #3370 after adding a test.

          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3302 Hi @huawei-flink, do you plan to follow up on this PR? Otherwise, I'd merge #3370 after adding a test. Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3370

          Thanks @haohui! The only thing missing would in fact be a test. IMO, extending `WindowAggregateTest` by copying the methods and adapting them to `proctime` would be sufficient.

          Let's see if #3302 is continued or not. If not, I'd like to add this PR.
          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3370 Thanks @haohui! The only thing missing would in fact be a test. IMO, extending `WindowAggregateTest` by copying the methods and adapting them to `proctime` would be sufficient. Let's see if #3302 is continued or not. If not, I'd like to add this PR. Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user huawei-flink commented on the issue:

          https://github.com/apache/flink/pull/3302

          Hi Fabian,

          I will follow up in the next days, probably early next week. is it ok?

          Stefano

          Show
          githubbot ASF GitHub Bot added a comment - Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3302 Hi Fabian, I will follow up in the next days, probably early next week. is it ok? Stefano
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user huawei-flink commented on the issue:

          https://github.com/apache/flink/pull/3302

          I managed to merge the changes from PR #3370 into my branch after rebase, and the test works. I will push the code later today. sorry if I am a little latent, but I am travelling and connectivity is sometimes a problem.

          Show
          githubbot ASF GitHub Bot added a comment - Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3302 I managed to merge the changes from PR #3370 into my branch after rebase, and the test works. I will push the code later today. sorry if I am a little latent, but I am travelling and connectivity is sometimes a problem.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user huawei-flink commented on the issue:

          https://github.com/apache/flink/pull/3370

          @haohui thanks for the contribution. I merged your code, I will push it later today.

          Show
          githubbot ASF GitHub Bot added a comment - Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3370 @haohui thanks for the contribution. I merged your code, I will push it later today.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3302

          That would be great @huawei-flink! There are a few contributions waiting for this addition.
          Thank you!

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3302 That would be great @huawei-flink! There are a few contributions waiting for this addition. Thank you!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3302

          Hi, it looks like something went wrong when updating the PR.
          Can you rebase your changes on the latest master?

          Thanks

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3302 Hi, it looks like something went wrong when updating the PR. Can you rebase your changes on the latest master? Thanks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui closed the pull request at:

          https://github.com/apache/flink/pull/3370

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui closed the pull request at: https://github.com/apache/flink/pull/3370
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user haohui reopened a pull request:

          https://github.com/apache/flink/pull/3370

          FLINK-5710 Add ProcTime() function to indicate StreamSQL.

          This is the commit we used internally – There is no unit tests associated with this PR. It simply serves as a reference point for #3302.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/haohui/flink FLINK-5710

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3370.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3370


          commit e68a7ad22cad926dac2f211fa3bd56ef481c4036
          Author: Haohui Mai <wheat9@apache.org>
          Date: 2017-02-23T21:51:45Z

          FLINK-5710 Add ProcTime() function to indicate StreamSQL.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user haohui reopened a pull request: https://github.com/apache/flink/pull/3370 FLINK-5710 Add ProcTime() function to indicate StreamSQL. This is the commit we used internally – There is no unit tests associated with this PR. It simply serves as a reference point for #3302. You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-5710 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3370.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3370 commit e68a7ad22cad926dac2f211fa3bd56ef481c4036 Author: Haohui Mai <wheat9@apache.org> Date: 2017-02-23T21:51:45Z FLINK-5710 Add ProcTime() function to indicate StreamSQL.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3370

          Thanks for the update @haohui!
          PR looks good.
          Will merge it later.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3370 Thanks for the update @haohui! PR looks good. Will merge it later.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3302

          Hi @huawei-flink,
          as I said before, there are a few contributions waiting for this patch.
          I decided to move one and merge PR #3370.

          Thanks a again for working on this and sorry for the inconvenience,
          Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3302 Hi @huawei-flink, as I said before, there are a few contributions waiting for this patch. I decided to move one and merge PR #3370. Thanks a again for working on this and sorry for the inconvenience, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3370#discussion_r102925889

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala —
          @@ -94,6 +94,27 @@ class WindowAggregateTest extends TableTestBase

          { streamUtil.verifySql(sql, expected) }

          + @Test
          + def testProcessingTime() = {
          + val sql = "SELECT COUNT FROM MyTable GROUP BY FLOOR(proc() TO HOUR)"
          — End diff –

          `proc()` -> `proctime()`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3370#discussion_r102925889 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala — @@ -94,6 +94,27 @@ class WindowAggregateTest extends TableTestBase { streamUtil.verifySql(sql, expected) } + @Test + def testProcessingTime() = { + val sql = "SELECT COUNT FROM MyTable GROUP BY FLOOR(proc() TO HOUR)" — End diff – `proc()` -> `proctime()`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3370

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3370
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3302

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3302
          Hide
          fhueske Fabian Hueske added a comment -

          Implemented for 1.3.0 with a755de27b85fe72be4a6f2063225ddc5c7f69058

          Show
          fhueske Fabian Hueske added a comment - Implemented for 1.3.0 with a755de27b85fe72be4a6f2063225ddc5c7f69058
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user huawei-flink commented on the issue:

          https://github.com/apache/flink/pull/3302

          @fhueske no problem, I understand. It was bad timing, as I was on the run for more than two weeks, with little time to follow this. We'll contribute on other issues.

          Show
          githubbot ASF GitHub Bot added a comment - Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3302 @fhueske no problem, I understand. It was bad timing, as I was on the run for more than two weeks, with little time to follow this. We'll contribute on other issues.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3302

          Thanks @huawei-flink
          Looking forward to your contributions!

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3302 Thanks @huawei-flink Looking forward to your contributions!

            People

            • Assignee:
              wheat9 Haohui Mai
              Reporter:
              stefano.bortoli Stefano Bortoli
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development