Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6491

Add QueryConfig to specify state retention time for streaming queries

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Critical
    • Resolution: Implemented
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      By now we have a couple of streaming operators (group-windows, over-windows, non-windowed aggregations) that require operator state. Since state is not automatically cleaned-up by Flink, we need to add a mechanism to configure a state retention time.

      If configured, a query will retain state for a specified period of state inactivity. If state is not accessed within this period of time, it will be cleared. I propose to add two parameters for this, a min and a max retention time. The min retention time specifies the earliest time and the max retention time the latest time when state is cleared. The reasoning for having two parameters is that we can avoid to register many timers if we have more freedom when to discard state.

      This issue also introduces a QueryConfig object which can be passed to a streaming query, when it is emitted to a TableSink or converted to a DataStream (append or retraction).

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user sunjincheng121 opened a pull request:

          https://github.com/apache/flink/pull/3863

          FLINK-6491[talbe]Add QueryConfig to specify state retention time for streaming queries

          In this PR we have the changes as follows:
          1. Add QueryConfig and state clean up for non-windowed aggregates.
          2. Add QueryConfig and state clean up for over-windowed aggregates.

          • [x] General
          • The pull request references the related JIRA issue ("FLINK-6491[talbe]Add QueryConfig to specify state retention time for streaming queries")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/sunjincheng121/flink FLINK-6491-PR

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3863.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3863


          commit 2d168a924d5a607343579793b64f621aa15419bd
          Author: Fabian Hueske <fhueske@apache.org>
          Date: 2017-05-08T16:41:37Z

          FLINK-6491 [table] Add QueryConfig and state clean up for non-windowed aggregates.

          commit f8b2ef3d27ef73142679ab50882a46c895074947
          Author: sunjincheng121 <sunjincheng121@gmail.com>
          Date: 2017-05-09T06:36:42Z

          FLINK-6491 [table] Add QueryConfig and state clean up for over-windowed aggregates.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3863 FLINK-6491 [talbe] Add QueryConfig to specify state retention time for streaming queries In this PR we have the changes as follows: 1. Add QueryConfig and state clean up for non-windowed aggregates. 2. Add QueryConfig and state clean up for over-windowed aggregates. [x] General The pull request references the related JIRA issue (" FLINK-6491 [talbe] Add QueryConfig to specify state retention time for streaming queries") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6491 -PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3863.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3863 commit 2d168a924d5a607343579793b64f621aa15419bd Author: Fabian Hueske <fhueske@apache.org> Date: 2017-05-08T16:41:37Z FLINK-6491 [table] Add QueryConfig and state clean up for non-windowed aggregates. commit f8b2ef3d27ef73142679ab50882a46c895074947 Author: sunjincheng121 <sunjincheng121@gmail.com> Date: 2017-05-09T06:36:42Z FLINK-6491 [table] Add QueryConfig and state clean up for over-windowed aggregates.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115731631

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala —
          @@ -81,6 +81,8 @@ abstract class StreamTableEnvironment(
          // the naming pattern for internally registered tables.
          private val internalNamePattern = "^DataStreamTable[0-9]+$".r

          + def qConf: StreamQueryConfig = new StreamQueryConfig
          — End diff –

          rename to `queryConfig`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115731631 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala — @@ -81,6 +81,8 @@ abstract class StreamTableEnvironment( // the naming pattern for internally registered tables. private val internalNamePattern = "^ DataStreamTable [0-9] +$".r + def qConf: StreamQueryConfig = new StreamQueryConfig — End diff – rename to `queryConfig`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115734636

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala —
          @@ -100,9 +104,18 @@ class DataStreamGroupAggregate(
          inputSchema.logicalType, groupings, getRowType, namedAggregates, Nil))
          }

          • override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
            + override def translateToPlan(
            + tableEnv: StreamTableEnvironment,
            + qConfig: StreamQueryConfig): DataStream[CRow] = {
            +
            + if (qConfig.getMinIdleStateRetentionTime < 0 || qConfig.getMaxIdleStateRetentionTime < 0) {
              • End diff –

          we should also check that this is not a global aggregate and only emit a warning if the aggregate is partitioned.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115734636 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala — @@ -100,9 +104,18 @@ class DataStreamGroupAggregate( inputSchema.logicalType, groupings, getRowType, namedAggregates, Nil)) } override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream [CRow] = { + override def translateToPlan( + tableEnv: StreamTableEnvironment, + qConfig: StreamQueryConfig): DataStream [CRow] = { + + if (qConfig.getMinIdleStateRetentionTime < 0 || qConfig.getMaxIdleStateRetentionTime < 0) { End diff – we should also check that this is not a global aggregate and only emit a warning if the aggregate is partitioned.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115731151

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala —
          @@ -113,9 +113,20 @@ abstract class BatchTableEnvironment(
          *

          • @param table The [[Table]] to write.
          • @param sink The [[TableSink]] to write the [[Table]] to.
            + * @param qConfig The configuration for the query to generate.
          • @tparam T The expected type of the [[DataSet]] which represents the [[Table]].
            */
          • override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = {
            + override private[flink] def writeToSink[T](
            + table: Table,
            + sink: TableSink[T],
            + qConfig: QueryConfig): Unit = {
            +
            + // We do not pass the configuration on, because there is nothing to configure for batch queries.
            + val bQConfig = qConfig match {
              • End diff –

          can change this to
          ```
          qConfig match

          { case _: BatchQueryConfig => case _ => throw new TableException("BatchQueryConfig required to configure batch query.") }

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115731151 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala — @@ -113,9 +113,20 @@ abstract class BatchTableEnvironment( * @param table The [ [Table] ] to write. @param sink The [ [TableSink] ] to write the [ [Table] ] to. + * @param qConfig The configuration for the query to generate. @tparam T The expected type of the [ [DataSet] ] which represents the [ [Table] ]. */ override private [flink] def writeToSink [T] (table: Table, sink: TableSink [T] ): Unit = { + override private [flink] def writeToSink [T] ( + table: Table, + sink: TableSink [T] , + qConfig: QueryConfig): Unit = { + + // We do not pass the configuration on, because there is nothing to configure for batch queries. + val bQConfig = qConfig match { End diff – can change this to ``` qConfig match { case _: BatchQueryConfig => case _ => throw new TableException("BatchQueryConfig required to configure batch query.") } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115779043

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala —
          @@ -220,15 +74,19 @@ class OverWindowHarnessTest extends HarnessTestBase{
          testHarness.processElement(new StreamRecord(
          CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 1))

          • testHarness.setProcessingTime(2)
            + // trigger cleanup timer and register cleanup timer with 6001
            + testHarness.setProcessingTime(3001)
            testHarness.processElement(new StreamRecord(
            CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 2))
            testHarness.processElement(new StreamRecord(
            CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 2))
            testHarness.processElement(new StreamRecord(
            CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 2))
            +
            + // using historical data and register cleanup timer with 9000
              • End diff –

          what do you mean by historical data?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115779043 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala — @@ -220,15 +74,19 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.processElement(new StreamRecord( CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 1)) testHarness.setProcessingTime(2) + // trigger cleanup timer and register cleanup timer with 6001 + testHarness.setProcessingTime(3001) testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 2)) testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 2)) testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 2)) + + // using historical data and register cleanup timer with 9000 End diff – what do you mean by historical data?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115734490

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala —
          @@ -239,6 +248,7 @@ class DataStreamOverAggregate(
          }

          def createBoundedAndCurrentRowOverWindow(
          — End diff –

          we should add a warning similar to `DataStreamGroupAggregate` if the windows are partitioned. The non-partitioned case can be cleaned up as well, but is not so critical.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115734490 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala — @@ -239,6 +248,7 @@ class DataStreamOverAggregate( } def createBoundedAndCurrentRowOverWindow( — End diff – we should add a warning similar to `DataStreamGroupAggregate` if the windows are partitioned. The non-partitioned case can be cleaned up as well, but is not so critical.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115747425

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala —
          @@ -19,15 +19,299 @@ package org.apache.flink.table.runtime.harness

          import java.util.

          {Comparator, Queue => JQueue}

          +import org.apache.flink.api.common.time.Time
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo.

          {INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO}

          import org.apache.flink.api.common.typeinfo.TypeInformation
          import org.apache.flink.api.java.functions.KeySelector
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          import org.apache.flink.streaming.api.operators.OneInputStreamOperator
          import org.apache.flink.streaming.api.watermark.Watermark
          import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
          import org.apache.flink.streaming.util.

          {KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil}

          -import org.apache.flink.table.runtime.types.CRow
          +import org.apache.flink.table.api.StreamQueryConfig
          +import org.apache.flink.table.codegen.GeneratedAggregationsFunction
          +import org.apache.flink.table.functions.AggregateFunction
          +import org.apache.flink.table.functions.aggfunctions.

          {LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction, IntSumWithRetractAggFunction}

          +import org.apache.flink.table.runtime.aggregate.AggregateUtil
          +import org.apache.flink.table.runtime.types.

          {CRow, CRowTypeInfo}

          class HarnessTestBase {
          +
          + protected var qConfig =
          — End diff –

          I would not make this part of the test base.
          IMO, the tests are easier to read if we do not have to check the harness for all implementations.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115747425 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala — @@ -19,15 +19,299 @@ package org.apache.flink.table.runtime.harness import java.util. {Comparator, Queue => JQueue} +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.BasicTypeInfo. {INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.streaming.api.operators.OneInputStreamOperator import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.runtime.streamrecord.StreamRecord import org.apache.flink.streaming.util. {KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil} -import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.codegen.GeneratedAggregationsFunction +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.functions.aggfunctions. {LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction, IntSumWithRetractAggFunction} +import org.apache.flink.table.runtime.aggregate.AggregateUtil +import org.apache.flink.table.runtime.types. {CRow, CRowTypeInfo} class HarnessTestBase { + + protected var qConfig = — End diff – I would not make this part of the test base. IMO, the tests are easier to read if we do not have to check the harness for all implementations.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115739260

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala —
          @@ -0,0 +1,99 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.

          {Long => JLong}

          +
          +import org.apache.flink.api.common.state.

          {ValueState, ValueStateDescriptor}

          +import org.apache.flink.api.common.state.State
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.api.

          {StreamQueryConfig, Types}

          +
          +abstract class ProcessFunctionWithCleanupState[IN,OUT](qConfig: StreamQueryConfig)
          + extends ProcessFunction[IN, OUT]{
          +
          + protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime
          + protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime
          + protected val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1
          + // interval in which clean-up timers are registered
          + protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime
          +
          + // holds the latest registered cleanup timer
          + private var cleanupTimeState: ValueState[JLong] = _
          +
          + protected def initCleanupTimeState(stateName: String) {
          + if (stateCleaningEnabled)

          { + val inputCntDescriptor: ValueStateDescriptor[JLong] = + new ValueStateDescriptor[JLong](stateName, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) + }

          + }
          +
          + protected def registerProcessingCleanupTimer(
          + ctx: ProcessFunction[IN, OUT]#Context,
          + currentTime: Long): Unit = {
          + if (stateCleaningEnabled) {
          +
          + val earliestCleanup = currentTime + minRetentionTime
          +
          + // last registered timer
          + val lastCleanupTime = cleanupTimeState.value()
          +
          + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval)

          { + // we need to register a new timer + val cleanupTime = earliestCleanup + cleanupTimerInterval + // register timer and remember clean-up time + ctx.timerService().registerProcessingTimeTimer(cleanupTime) + cleanupTimeState.update(cleanupTime) + }

          + }
          + }
          + protected def registerEventCleanupTimer(
          + ctx: ProcessFunction[IN, OUT]#Context,
          + currentTime: Long): Unit = {
          + if (stateCleaningEnabled) {
          +
          + val earliestCleanup = currentTime + minRetentionTime
          +
          + // last registered timer
          + val lastCleanupTime = cleanupTimeState.value()
          +
          + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval)

          { + // we need to register a new timer + val cleanupTime = earliestCleanup + cleanupTimerInterval + // register timer and remember clean-up time + ctx.timerService().registerEventTimeTimer(cleanupTime) + cleanupTimeState.update(cleanupTime) + }

          + }
          + }
          +
          + protected def cleanupStateOnTimer(timestamp: Long, states: State*): Boolean = {
          — End diff –

          we need to add the `OnTimerContext` as a parameter as well to check if this is a processing time timer call.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115739260 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala — @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang. {Long => JLong} + +import org.apache.flink.api.common.state. {ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.state.State +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api. {StreamQueryConfig, Types} + +abstract class ProcessFunctionWithCleanupState [IN,OUT] (qConfig: StreamQueryConfig) + extends ProcessFunction [IN, OUT] { + + protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1 + // interval in which clean-up timers are registered + protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime + + // holds the latest registered cleanup timer + private var cleanupTimeState: ValueState [JLong] = _ + + protected def initCleanupTimeState(stateName: String) { + if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = + new ValueStateDescriptor[JLong](stateName, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) + } + } + + protected def registerProcessingCleanupTimer( + ctx: ProcessFunction [IN, OUT] #Context, + currentTime: Long): Unit = { + if (stateCleaningEnabled) { + + val earliestCleanup = currentTime + minRetentionTime + + // last registered timer + val lastCleanupTime = cleanupTimeState.value() + + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval) { + // we need to register a new timer + val cleanupTime = earliestCleanup + cleanupTimerInterval + // register timer and remember clean-up time + ctx.timerService().registerProcessingTimeTimer(cleanupTime) + cleanupTimeState.update(cleanupTime) + } + } + } + protected def registerEventCleanupTimer( + ctx: ProcessFunction [IN, OUT] #Context, + currentTime: Long): Unit = { + if (stateCleaningEnabled) { + + val earliestCleanup = currentTime + minRetentionTime + + // last registered timer + val lastCleanupTime = cleanupTimeState.value() + + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval) { + // we need to register a new timer + val cleanupTime = earliestCleanup + cleanupTimerInterval + // register timer and remember clean-up time + ctx.timerService().registerEventTimeTimer(cleanupTime) + cleanupTimeState.update(cleanupTime) + } + } + } + + protected def cleanupStateOnTimer(timestamp: Long, states: State*): Boolean = { — End diff – we need to add the `OnTimerContext` as a parameter as well to check if this is a processing time timer call.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115775350

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala —
          @@ -0,0 +1,99 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.

          {Long => JLong}

          +
          +import org.apache.flink.api.common.state.

          {ValueState, ValueStateDescriptor}

          +import org.apache.flink.api.common.state.State
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.api.

          {StreamQueryConfig, Types}

          +
          +abstract class ProcessFunctionWithCleanupState[IN,OUT](qConfig: StreamQueryConfig)
          + extends ProcessFunction[IN, OUT]{
          +
          + protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime
          + protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime
          + protected val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1
          + // interval in which clean-up timers are registered
          + protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime
          +
          + // holds the latest registered cleanup timer
          + private var cleanupTimeState: ValueState[JLong] = _
          +
          + protected def initCleanupTimeState(stateName: String) {
          + if (stateCleaningEnabled)

          { + val inputCntDescriptor: ValueStateDescriptor[JLong] = + new ValueStateDescriptor[JLong](stateName, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) + }

          + }
          +
          + protected def registerProcessingCleanupTimer(
          + ctx: ProcessFunction[IN, OUT]#Context,
          + currentTime: Long): Unit = {
          + if (stateCleaningEnabled) {
          +
          + val earliestCleanup = currentTime + minRetentionTime
          +
          + // last registered timer
          + val lastCleanupTime = cleanupTimeState.value()
          +
          + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval)

          { + // we need to register a new timer + val cleanupTime = earliestCleanup + cleanupTimerInterval + // register timer and remember clean-up time + ctx.timerService().registerProcessingTimeTimer(cleanupTime) + cleanupTimeState.update(cleanupTime) + }

          + }
          + }
          + protected def registerEventCleanupTimer(
          — End diff –

          I'm not sure if we should define state cleanup on event time.
          If we have an event-time window followed by a non-windowed aggregate, we would use event-time and processing time for cleaning up. Event-time is also harder to reason about than processing time.
          The RocksDB TTL is probably also defined in terms of wall clock time.

          So I would propose to drop this method and always use processing time for cleanup timers.
          What do you think @sunjincheng121?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115775350 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala — @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang. {Long => JLong} + +import org.apache.flink.api.common.state. {ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.state.State +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api. {StreamQueryConfig, Types} + +abstract class ProcessFunctionWithCleanupState [IN,OUT] (qConfig: StreamQueryConfig) + extends ProcessFunction [IN, OUT] { + + protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1 + // interval in which clean-up timers are registered + protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime + + // holds the latest registered cleanup timer + private var cleanupTimeState: ValueState [JLong] = _ + + protected def initCleanupTimeState(stateName: String) { + if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = + new ValueStateDescriptor[JLong](stateName, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) + } + } + + protected def registerProcessingCleanupTimer( + ctx: ProcessFunction [IN, OUT] #Context, + currentTime: Long): Unit = { + if (stateCleaningEnabled) { + + val earliestCleanup = currentTime + minRetentionTime + + // last registered timer + val lastCleanupTime = cleanupTimeState.value() + + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval) { + // we need to register a new timer + val cleanupTime = earliestCleanup + cleanupTimerInterval + // register timer and remember clean-up time + ctx.timerService().registerProcessingTimeTimer(cleanupTime) + cleanupTimeState.update(cleanupTime) + } + } + } + protected def registerEventCleanupTimer( — End diff – I'm not sure if we should define state cleanup on event time. If we have an event-time window followed by a non-windowed aggregate, we would use event-time and processing time for cleaning up. Event-time is also harder to reason about than processing time. The RocksDB TTL is probably also defined in terms of wall clock time. So I would propose to drop this method and always use processing time for cleanup timers. What do you think @sunjincheng121?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115734432

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala —
          @@ -182,6 +189,7 @@ class DataStreamOverAggregate(
          }

          def createUnboundedAndCurrentRowOverWindow(
          — End diff –

          we should add a warning similar to `DataStreamGroupAggregate` if the windows are partitioned. The non-partitioned case can be cleaned up as well, but is not so critical.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115734432 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala — @@ -182,6 +189,7 @@ class DataStreamOverAggregate( } def createUnboundedAndCurrentRowOverWindow( — End diff – we should add a warning similar to `DataStreamGroupAggregate` if the windows are partitioned. The non-partitioned case can be cleaned up as well, but is not so critical.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115739685

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala —
          @@ -43,8 +44,9 @@ class RowTimeBoundedRowsOver(
          genAggregations: GeneratedAggregationsFunction,
          aggregationStateType: RowTypeInfo,
          inputRowType: CRowTypeInfo,

          • precedingOffset: Long)
          • extends ProcessFunction[CRow, CRow]
            + precedingOffset: Long,
            + qConfig: StreamQueryConfig)
            +extends ProcessFunctionWithCleanupState[CRow, CRow](qConfig)
              • End diff –

          indent +2

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115739685 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala — @@ -43,8 +44,9 @@ class RowTimeBoundedRowsOver( genAggregations: GeneratedAggregationsFunction, aggregationStateType: RowTypeInfo, inputRowType: CRowTypeInfo, precedingOffset: Long) extends ProcessFunction [CRow, CRow] + precedingOffset: Long, + qConfig: StreamQueryConfig) +extends ProcessFunctionWithCleanupState [CRow, CRow] (qConfig) End diff – indent +2
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115750947

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala —
          @@ -182,6 +189,7 @@ class DataStreamOverAggregate(
          }

          def createUnboundedAndCurrentRowOverWindow(
          — End diff –

          Also we should check for processing time over range windows that `minIdleStateRetentionTime` > preceding interval. Otherwise, we cannot compute this correctly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115750947 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala — @@ -182,6 +189,7 @@ class DataStreamOverAggregate( } def createUnboundedAndCurrentRowOverWindow( — End diff – Also we should check for processing time over range windows that `minIdleStateRetentionTime` > preceding interval. Otherwise, we cannot compute this correctly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115778205

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala —
          @@ -0,0 +1,99 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.

          {Long => JLong}

          +
          +import org.apache.flink.api.common.state.

          {ValueState, ValueStateDescriptor}

          +import org.apache.flink.api.common.state.State
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.api.

          {StreamQueryConfig, Types}

          +
          +abstract class ProcessFunctionWithCleanupState[IN,OUT](qConfig: StreamQueryConfig)
          + extends ProcessFunction[IN, OUT]{
          +
          + protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime
          + protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime
          + protected val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1
          + // interval in which clean-up timers are registered
          + protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime
          +
          + // holds the latest registered cleanup timer
          + private var cleanupTimeState: ValueState[JLong] = _
          +
          + protected def initCleanupTimeState(stateName: String) {
          + if (stateCleaningEnabled)

          { + val inputCntDescriptor: ValueStateDescriptor[JLong] = + new ValueStateDescriptor[JLong](stateName, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) + }

          + }
          +
          + protected def registerProcessingCleanupTimer(
          + ctx: ProcessFunction[IN, OUT]#Context,
          + currentTime: Long): Unit = {
          + if (stateCleaningEnabled) {
          +
          + val earliestCleanup = currentTime + minRetentionTime
          +
          + // last registered timer
          + val lastCleanupTime = cleanupTimeState.value()
          +
          + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval) {
          — End diff –

          It would also be good to extend one of the test cases to cover the corner cases of the clean up logic (1 ms before min retention time, 1 ms before max retention time, 1 ms after min retention, 1 ms after max retention).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115778205 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala — @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang. {Long => JLong} + +import org.apache.flink.api.common.state. {ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.state.State +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api. {StreamQueryConfig, Types} + +abstract class ProcessFunctionWithCleanupState [IN,OUT] (qConfig: StreamQueryConfig) + extends ProcessFunction [IN, OUT] { + + protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1 + // interval in which clean-up timers are registered + protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime + + // holds the latest registered cleanup timer + private var cleanupTimeState: ValueState [JLong] = _ + + protected def initCleanupTimeState(stateName: String) { + if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = + new ValueStateDescriptor[JLong](stateName, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) + } + } + + protected def registerProcessingCleanupTimer( + ctx: ProcessFunction [IN, OUT] #Context, + currentTime: Long): Unit = { + if (stateCleaningEnabled) { + + val earliestCleanup = currentTime + minRetentionTime + + // last registered timer + val lastCleanupTime = cleanupTimeState.value() + + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval) { — End diff – It would also be good to extend one of the test cases to cover the corner cases of the clean up logic (1 ms before min retention time, 1 ms before max retention time, 1 ms after min retention, 1 ms after max retention).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115739101

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala —
          @@ -0,0 +1,99 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.

          {Long => JLong}

          +
          +import org.apache.flink.api.common.state.

          {ValueState, ValueStateDescriptor}

          +import org.apache.flink.api.common.state.State
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.api.

          {StreamQueryConfig, Types}

          +
          +abstract class ProcessFunctionWithCleanupState[IN,OUT](qConfig: StreamQueryConfig)
          + extends ProcessFunction[IN, OUT]{
          +
          + protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime
          + protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime
          + protected val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1
          + // interval in which clean-up timers are registered
          + protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime
          +
          + // holds the latest registered cleanup timer
          + private var cleanupTimeState: ValueState[JLong] = _
          +
          + protected def initCleanupTimeState(stateName: String) {
          + if (stateCleaningEnabled)

          { + val inputCntDescriptor: ValueStateDescriptor[JLong] = + new ValueStateDescriptor[JLong](stateName, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) + }

          + }
          +
          + protected def registerProcessingCleanupTimer(
          + ctx: ProcessFunction[IN, OUT]#Context,
          + currentTime: Long): Unit = {
          + if (stateCleaningEnabled) {
          +
          + val earliestCleanup = currentTime + minRetentionTime
          +
          + // last registered timer
          + val lastCleanupTime = cleanupTimeState.value()
          +
          + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval)

          { + // we need to register a new timer + val cleanupTime = earliestCleanup + cleanupTimerInterval + // register timer and remember clean-up time + ctx.timerService().registerProcessingTimeTimer(cleanupTime) + cleanupTimeState.update(cleanupTime) + }

          + }
          + }
          + protected def registerEventCleanupTimer(
          + ctx: ProcessFunction[IN, OUT]#Context,
          + currentTime: Long): Unit = {
          + if (stateCleaningEnabled) {
          +
          + val earliestCleanup = currentTime + minRetentionTime
          +
          + // last registered timer
          + val lastCleanupTime = cleanupTimeState.value()
          +
          + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval)

          { + // we need to register a new timer + val cleanupTime = earliestCleanup + cleanupTimerInterval + // register timer and remember clean-up time + ctx.timerService().registerEventTimeTimer(cleanupTime) + cleanupTimeState.update(cleanupTime) + }

          + }
          + }
          +
          + protected def cleanupStateOnTimer(timestamp: Long, states: State*): Boolean = {
          + var result: Boolean = false
          + if (stateCleaningEnabled) {
          + val cleanupTime = cleanupTimeState.value()
          + if (null != cleanupTime && timestamp == cleanupTime) {
          — End diff –

          We need to extend the condition to
          ```
          (null != cleanupTime && timestamp == cleanupTime && ctx.timeDomain == TimeDomain.PROCESSING_TIME)
          ```
          to ensure that this is actually a processing time timer.
          Otherwise, event-time operators might clean the space if we process records with timestamps which are a bit in the future.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115739101 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala — @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang. {Long => JLong} + +import org.apache.flink.api.common.state. {ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.state.State +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api. {StreamQueryConfig, Types} + +abstract class ProcessFunctionWithCleanupState [IN,OUT] (qConfig: StreamQueryConfig) + extends ProcessFunction [IN, OUT] { + + protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1 + // interval in which clean-up timers are registered + protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime + + // holds the latest registered cleanup timer + private var cleanupTimeState: ValueState [JLong] = _ + + protected def initCleanupTimeState(stateName: String) { + if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = + new ValueStateDescriptor[JLong](stateName, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) + } + } + + protected def registerProcessingCleanupTimer( + ctx: ProcessFunction [IN, OUT] #Context, + currentTime: Long): Unit = { + if (stateCleaningEnabled) { + + val earliestCleanup = currentTime + minRetentionTime + + // last registered timer + val lastCleanupTime = cleanupTimeState.value() + + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval) { + // we need to register a new timer + val cleanupTime = earliestCleanup + cleanupTimerInterval + // register timer and remember clean-up time + ctx.timerService().registerProcessingTimeTimer(cleanupTime) + cleanupTimeState.update(cleanupTime) + } + } + } + protected def registerEventCleanupTimer( + ctx: ProcessFunction [IN, OUT] #Context, + currentTime: Long): Unit = { + if (stateCleaningEnabled) { + + val earliestCleanup = currentTime + minRetentionTime + + // last registered timer + val lastCleanupTime = cleanupTimeState.value() + + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval) { + // we need to register a new timer + val cleanupTime = earliestCleanup + cleanupTimerInterval + // register timer and remember clean-up time + ctx.timerService().registerEventTimeTimer(cleanupTime) + cleanupTimeState.update(cleanupTime) + } + } + } + + protected def cleanupStateOnTimer(timestamp: Long, states: State*): Boolean = { + var result: Boolean = false + if (stateCleaningEnabled) { + val cleanupTime = cleanupTimeState.value() + if (null != cleanupTime && timestamp == cleanupTime) { — End diff – We need to extend the condition to ``` (null != cleanupTime && timestamp == cleanupTime && ctx.timeDomain == TimeDomain.PROCESSING_TIME) ``` to ensure that this is actually a processing time timer. Otherwise, event-time operators might clean the space if we process records with timestamps which are a bit in the future.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115776739

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala —
          @@ -0,0 +1,99 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.

          {Long => JLong}

          +
          +import org.apache.flink.api.common.state.

          {ValueState, ValueStateDescriptor}

          +import org.apache.flink.api.common.state.State
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.api.

          {StreamQueryConfig, Types}

          +
          +abstract class ProcessFunctionWithCleanupState[IN,OUT](qConfig: StreamQueryConfig)
          + extends ProcessFunction[IN, OUT]{
          +
          + protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime
          + protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime
          + protected val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1
          + // interval in which clean-up timers are registered
          + protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime
          +
          + // holds the latest registered cleanup timer
          + private var cleanupTimeState: ValueState[JLong] = _
          +
          + protected def initCleanupTimeState(stateName: String) {
          + if (stateCleaningEnabled)

          { + val inputCntDescriptor: ValueStateDescriptor[JLong] = + new ValueStateDescriptor[JLong](stateName, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) + }

          + }
          +
          + protected def registerProcessingCleanupTimer(
          + ctx: ProcessFunction[IN, OUT]#Context,
          + currentTime: Long): Unit = {
          + if (stateCleaningEnabled) {
          +
          + val earliestCleanup = currentTime + minRetentionTime
          +
          + // last registered timer
          + val lastCleanupTime = cleanupTimeState.value()
          +
          + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval) {
          — End diff –

          This condition is wrong and would lead to state being discarded before the min retention time. It should be `(lastCleanupTime == null || earliestCleanup > lastCleanupTime)`.

          With this condition, the timer logic works as follows:

          • If the earliest time at which the state may be cleaned (`earliestCleanup`) is later than the last registered timer (`lastCleanupTime`), we need to register a new (later) timer.
          • The new timer is registered for current time + max retention interval, such that all records that arrive from now on until `currentTime + cleanupTimerInterval` can reuse this timer.

          Please double check the logic @sunjincheng121.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115776739 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala — @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang. {Long => JLong} + +import org.apache.flink.api.common.state. {ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.state.State +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api. {StreamQueryConfig, Types} + +abstract class ProcessFunctionWithCleanupState [IN,OUT] (qConfig: StreamQueryConfig) + extends ProcessFunction [IN, OUT] { + + protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1 + // interval in which clean-up timers are registered + protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime + + // holds the latest registered cleanup timer + private var cleanupTimeState: ValueState [JLong] = _ + + protected def initCleanupTimeState(stateName: String) { + if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = + new ValueStateDescriptor[JLong](stateName, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) + } + } + + protected def registerProcessingCleanupTimer( + ctx: ProcessFunction [IN, OUT] #Context, + currentTime: Long): Unit = { + if (stateCleaningEnabled) { + + val earliestCleanup = currentTime + minRetentionTime + + // last registered timer + val lastCleanupTime = cleanupTimeState.value() + + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval) { — End diff – This condition is wrong and would lead to state being discarded before the min retention time. It should be `(lastCleanupTime == null || earliestCleanup > lastCleanupTime)`. With this condition, the timer logic works as follows: If the earliest time at which the state may be cleaned (`earliestCleanup`) is later than the last registered timer (`lastCleanupTime`), we need to register a new (later) timer. The new timer is registered for current time + max retention interval, such that all records that arrive from now on until `currentTime + cleanupTimerInterval` can reuse this timer. Please double check the logic @sunjincheng121.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115735538

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala —
          @@ -54,6 +55,7 @@ class GroupAggProcessFunction(
          private var state: ValueState[Row] = _
          // counts the number of added and retracted input records
          private var cntState: ValueState[JLong] = _
          + // holds the latest registered cleanup timer
          — End diff –

          remove comment

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115735538 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala — @@ -54,6 +55,7 @@ class GroupAggProcessFunction( private var state: ValueState [Row] = _ // counts the number of added and retracted input records private var cntState: ValueState [JLong] = _ + // holds the latest registered cleanup timer — End diff – remove comment
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115750977

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala —
          @@ -239,6 +248,7 @@ class DataStreamOverAggregate(
          }

          def createBoundedAndCurrentRowOverWindow(
          — End diff –

          Also we should check for processing time over range windows that `minIdleStateRetentionTime` > preceding interval. Otherwise, we cannot compute this correctly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115750977 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala — @@ -239,6 +248,7 @@ class DataStreamOverAggregate( } def createBoundedAndCurrentRowOverWindow( — End diff – Also we should check for processing time over range windows that `minIdleStateRetentionTime` > preceding interval. Otherwise, we cannot compute this correctly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115778815

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala —
          @@ -265,7 +123,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
          Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 20L: JLong, 30L: JLong), true), 1))
          expectedOutput.add(new StreamRecord(
          CRow(

          • Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 6L: JLong, 7L: JLong), true), 2))
            + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 7L: JLong, 7L: JLong), true), 2))
              • End diff –

          The state for this record may not have been deleted before. The last input for that key is on time `1100` and the min retention time is `2000ms`, so the state should be kept at least until `3100` but was already discarded at `3001` (this is the bug that I pointed out earlier).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115778815 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala — @@ -265,7 +123,7 @@ class OverWindowHarnessTest extends HarnessTestBase{ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 20L: JLong, 30L: JLong), true), 1)) expectedOutput.add(new StreamRecord( CRow( Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 6L: JLong, 7L: JLong), true), 2)) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 7L: JLong, 7L: JLong), true), 2)) End diff – The state for this record may not have been deleted before. The last input for that key is on time `1100` and the min retention time is `2000ms`, so the state should be kept at least until `3100` but was already discarded at `3001` (this is the bug that I pointed out earlier).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115888057

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala —
          @@ -81,6 +81,8 @@ abstract class StreamTableEnvironment(
          // the naming pattern for internally registered tables.
          private val internalNamePattern = "^DataStreamTable[0-9]+$".r

          + def qConf: StreamQueryConfig = new StreamQueryConfig
          — End diff –

          Yes,I want change all `qConfig` and `qConf` to `queryConfig`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115888057 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala — @@ -81,6 +81,8 @@ abstract class StreamTableEnvironment( // the naming pattern for internally registered tables. private val internalNamePattern = "^ DataStreamTable [0-9] +$".r + def qConf: StreamQueryConfig = new StreamQueryConfig — End diff – Yes,I want change all `qConfig` and `qConf` to `queryConfig`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115907975

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala —
          @@ -0,0 +1,99 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.

          {Long => JLong}

          +
          +import org.apache.flink.api.common.state.

          {ValueState, ValueStateDescriptor}

          +import org.apache.flink.api.common.state.State
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.api.

          {StreamQueryConfig, Types}

          +
          +abstract class ProcessFunctionWithCleanupState[IN,OUT](qConfig: StreamQueryConfig)
          + extends ProcessFunction[IN, OUT]{
          +
          + protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime
          + protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime
          + protected val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1
          + // interval in which clean-up timers are registered
          + protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime
          +
          + // holds the latest registered cleanup timer
          + private var cleanupTimeState: ValueState[JLong] = _
          +
          + protected def initCleanupTimeState(stateName: String) {
          + if (stateCleaningEnabled)

          { + val inputCntDescriptor: ValueStateDescriptor[JLong] = + new ValueStateDescriptor[JLong](stateName, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) + }

          + }
          +
          + protected def registerProcessingCleanupTimer(
          + ctx: ProcessFunction[IN, OUT]#Context,
          + currentTime: Long): Unit = {
          + if (stateCleaningEnabled) {
          +
          + val earliestCleanup = currentTime + minRetentionTime
          +
          + // last registered timer
          + val lastCleanupTime = cleanupTimeState.value()
          +
          + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval)

          { + // we need to register a new timer + val cleanupTime = earliestCleanup + cleanupTimerInterval + // register timer and remember clean-up time + ctx.timerService().registerProcessingTimeTimer(cleanupTime) + cleanupTimeState.update(cleanupTime) + }

          + }
          + }
          + protected def registerEventCleanupTimer(
          — End diff –

          The reason of I add this method is In A `row-time` OVER (TimeDomain.EVENT_TIME), we always get `0` when we call `ctx.timerService.currentProcessingTime`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115907975 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala — @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang. {Long => JLong} + +import org.apache.flink.api.common.state. {ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.state.State +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api. {StreamQueryConfig, Types} + +abstract class ProcessFunctionWithCleanupState [IN,OUT] (qConfig: StreamQueryConfig) + extends ProcessFunction [IN, OUT] { + + protected val minRetentionTime = qConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1 + // interval in which clean-up timers are registered + protected val cleanupTimerInterval = maxRetentionTime - minRetentionTime + + // holds the latest registered cleanup timer + private var cleanupTimeState: ValueState [JLong] = _ + + protected def initCleanupTimeState(stateName: String) { + if (stateCleaningEnabled) { + val inputCntDescriptor: ValueStateDescriptor[JLong] = + new ValueStateDescriptor[JLong](stateName, Types.LONG) + cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor) + } + } + + protected def registerProcessingCleanupTimer( + ctx: ProcessFunction [IN, OUT] #Context, + currentTime: Long): Unit = { + if (stateCleaningEnabled) { + + val earliestCleanup = currentTime + minRetentionTime + + // last registered timer + val lastCleanupTime = cleanupTimeState.value() + + if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval) { + // we need to register a new timer + val cleanupTime = earliestCleanup + cleanupTimerInterval + // register timer and remember clean-up time + ctx.timerService().registerProcessingTimeTimer(cleanupTime) + cleanupTimeState.update(cleanupTime) + } + } + } + protected def registerEventCleanupTimer( — End diff – The reason of I add this method is In A `row-time` OVER (TimeDomain.EVENT_TIME), we always get `0` when we call `ctx.timerService.currentProcessingTime`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3863#discussion_r115908788

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala —
          @@ -239,6 +248,7 @@ class DataStreamOverAggregate(
          }

          def createBoundedAndCurrentRowOverWindow(
          — End diff –

          I do not think we need add this check. Because both `row-base` and `time-base` will be able to meet the incorrect result due to state cleanup. an importation thing we need to do is let use know the function of the `cleanup config`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3863#discussion_r115908788 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala — @@ -239,6 +248,7 @@ class DataStreamOverAggregate( } def createBoundedAndCurrentRowOverWindow( — End diff – I do not think we need add this check. Because both `row-base` and `time-base` will be able to meet the incorrect result due to state cleanup. an importation thing we need to do is let use know the function of the `cleanup config`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3863

          Hi @fhueske thanks a lot for your reviewing. I had update the PR. with following changes:

          • Uniform hump name, change all `qConfig` and `qConf` to `queryConfig`.
          • Add warn log into `DataStreamOverAggregate`.
          • Fix cleanup logic
          • Extend test cases that cover the corner cases of the clean up logic.

          But there are two things I have not do, that is:

          • `check for processing time over range windows that minIdleStateRetentionTime > preceding interval.`
            I do not think we need add this check. Because both row-base and time-base will be able to meet the incorrect result due to state cleanup. an importation thing we need to do is let use know the function of the cleanup config.
          • `remove the registerEventCleanupTimer method`
            The reason of I add this method is In A row-time OVER (TimeDomain.EVENT_TIME), we always get 0 when we call ctx.timerService.currentProcessingTime.

          What do you think?
          Thanks,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3863 Hi @fhueske thanks a lot for your reviewing. I had update the PR. with following changes: Uniform hump name, change all `qConfig` and `qConf` to `queryConfig`. Add warn log into `DataStreamOverAggregate`. Fix cleanup logic Extend test cases that cover the corner cases of the clean up logic. But there are two things I have not do, that is: `check for processing time over range windows that minIdleStateRetentionTime > preceding interval.` I do not think we need add this check. Because both row-base and time-base will be able to meet the incorrect result due to state cleanup. an importation thing we need to do is let use know the function of the cleanup config. `remove the registerEventCleanupTimer method` The reason of I add this method is In A row-time OVER (TimeDomain.EVENT_TIME), we always get 0 when we call ctx.timerService.currentProcessingTime. What do you think? Thanks, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3863

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3863
          Hide
          fhueske Fabian Hueske added a comment -

          Implemented for 1.3 with f0868c5d77d1ace9989a7ee56b24368d63bff138
          Implemented for 1.4 with 2480887180d881c30d228a73c746f94abbcbbb64

          Show
          fhueske Fabian Hueske added a comment - Implemented for 1.3 with f0868c5d77d1ace9989a7ee56b24368d63bff138 Implemented for 1.4 with 2480887180d881c30d228a73c746f94abbcbbb64

            People

            • Assignee:
              sunjincheng121 sunjincheng
              Reporter:
              fhueske Fabian Hueske
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development