Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6583

Enable QueryConfig in count base GroupWindow

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0, 1.4.0
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Enable QueryConfig in count base GroupWindow by Add a custom Trigger `CountTriggerWithCleanupState`. See more in FLINK-6491.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user sunjincheng121 opened a pull request:

          https://github.com/apache/flink/pull/3919

          FLINK-6583[talbe]Enable QueryConfig in count base GroupWindow

          In this PR. Enabled the `QueryConfig` for count base `GroupWindow`.

          • [x] General
          • The pull request references the related JIRA issue ("FLINK-6583[talbe]Enable QueryConfig in count base GroupWindow")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/sunjincheng121/flink FLINK-6583-PR

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3919.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3919


          commit 8f54635006b8caa2770d552209e7ad7fe2475f96
          Author: sunjincheng121 <sunjincheng121@gmail.com>
          Date: 2017-05-16T03:58:37Z

          FLINK-6583[talbe]Enable QueryConfig in count base GroupWindow


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3919 FLINK-6583 [talbe] Enable QueryConfig in count base GroupWindow In this PR. Enabled the `QueryConfig` for count base `GroupWindow`. [x] General The pull request references the related JIRA issue (" FLINK-6583 [talbe] Enable QueryConfig in count base GroupWindow") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6583 -PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3919.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3919 commit 8f54635006b8caa2770d552209e7ad7fe2475f96 Author: sunjincheng121 <sunjincheng121@gmail.com> Date: 2017-05-16T03:58:37Z FLINK-6583 [talbe] Enable QueryConfig in count base GroupWindow
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3919#discussion_r116727812

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala —
          @@ -296,6 +322,8 @@ object DataStreamGroupWindowAggregate {
          case SlidingGroupWindow(_, timeField, size, slide)
          if isProctimeAttribute(timeField) && isRowCountLiteral(size)=>
          stream.countWindowAll(toLong(size), toLong(slide))
          + .evictor(CountEvictor.of(toLong(size)))
          — End diff –

          same as above

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r116727812 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala — @@ -296,6 +322,8 @@ object DataStreamGroupWindowAggregate { case SlidingGroupWindow(_, timeField, size, slide) if isProctimeAttribute(timeField) && isRowCountLiteral(size)=> stream.countWindowAll(toLong(size), toLong(slide)) + .evictor(CountEvictor.of(toLong(size))) — End diff – same as above
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3919#discussion_r116727685

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala —
          @@ -245,6 +266,8 @@ object DataStreamGroupWindowAggregate {
          case SlidingGroupWindow(_, timeField, size, slide)
          if isProctimeAttribute(timeField) && isRowCountLiteral(size) =>
          stream.countWindow(toLong(size), toLong(slide))
          + .evictor(CountEvictor.of(toLong(size)))
          — End diff –

          Do we need to override the evictor as well?
          I think it should not be changed if we add a custom trigger.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r116727685 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala — @@ -245,6 +266,8 @@ object DataStreamGroupWindowAggregate { case SlidingGroupWindow(_, timeField, size, slide) if isProctimeAttribute(timeField) && isRowCountLiteral(size) => stream.countWindow(toLong(size), toLong(slide)) + .evictor(CountEvictor.of(toLong(size))) — End diff – Do we need to override the evictor as well? I think it should not be changed if we add a custom trigger.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3919#discussion_r116733901

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala —
          @@ -0,0 +1,146 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.triggers
          +
          +import java.lang.

          {Long => JLong}

          +
          +import org.apache.flink.api.common.functions.ReduceFunction
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
          +import org.apache.flink.streaming.api.windowing.triggers.

          {Trigger, TriggerResult}

          +import org.apache.flink.streaming.api.windowing.windows.Window
          +import org.apache.flink.table.api.

          {StreamQueryConfig, Types}

          +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum
          +
          +class CountTriggerWithCleanupState[W <: Window](queryConfig: StreamQueryConfig, maxCount: Long)
          + extends Trigger[Any, W] {
          +
          + private val serialVersionUID: Long = 1L
          +
          + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
          + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
          + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
          +
          + private val stateDesc: ReducingStateDescriptor[JLong] =
          + new ReducingStateDescriptor[JLong]("count", new Sum, Types.LONG)
          +
          + private val cleanupStateDesc: ValueStateDescriptor[JLong] =
          + new ValueStateDescriptor[JLong]("countCleanup", Types.LONG)
          +
          + override def canMerge: Boolean = true
          +
          + override def onMerge(window: W, ctx: Trigger.OnMergeContext) {
          + ctx.mergePartitionedState(stateDesc)
          — End diff –

          This will fail because `ValueState` is not a `MergingState`. We could implement the `cleanupStateDesc` as a `ReducingState` with a `max` `ReduceFunction`. That would allow the Flink to merge the state.

          On the other hand, we won't use the Trigger in merging windows anyways, to it would also be fine to not support merging at all (`canMerge = false`). I'd actually go for this option because it simplifies the trigger.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r116733901 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala — @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.triggers + +import java.lang. {Long => JLong} + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext +import org.apache.flink.streaming.api.windowing.triggers. {Trigger, TriggerResult} +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.table.api. {StreamQueryConfig, Types} +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum + +class CountTriggerWithCleanupState [W <: Window] (queryConfig: StreamQueryConfig, maxCount: Long) + extends Trigger [Any, W] { + + private val serialVersionUID: Long = 1L + + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val stateDesc: ReducingStateDescriptor [JLong] = + new ReducingStateDescriptor [JLong] ("count", new Sum, Types.LONG) + + private val cleanupStateDesc: ValueStateDescriptor [JLong] = + new ValueStateDescriptor [JLong] ("countCleanup", Types.LONG) + + override def canMerge: Boolean = true + + override def onMerge(window: W, ctx: Trigger.OnMergeContext) { + ctx.mergePartitionedState(stateDesc) — End diff – This will fail because `ValueState` is not a `MergingState`. We could implement the `cleanupStateDesc` as a `ReducingState` with a `max` `ReduceFunction`. That would allow the Flink to merge the state. On the other hand, we won't use the Trigger in merging windows anyways, to it would also be fine to not support merging at all (`canMerge = false`). I'd actually go for this option because it simplifies the trigger.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3919#discussion_r116739961

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala —
          @@ -0,0 +1,305 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.harness
          +
          +import com.google.common.collect.Lists
          +import org.apache.flink.api.common.time.Time
          +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
          +import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
          +import org.apache.flink.table.api.StreamQueryConfig
          +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState
          +import org.junit.Assert.assertEquals
          +import org.junit.Test
          +
          +class CountTriggerWithCleanupStateHarnessTest {
          + protected var queryConfig =
          + new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
          +
          + @Test
          + def testFiringAndFireingWithPurging(): Unit = {
          + val testHarness = new TriggerTestHarness[Any, TimeWindow](
          + CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 10), new TimeWindow.Serializer)
          +
          + // try to trigger onProcessingTime method via 1, but there is non timer is triggered
          + assertEquals(0, testHarness.advanceProcessingTime(1).size())
          +
          + // register cleanup timer with 3001
          + assertEquals(
          + TriggerResult.CONTINUE,
          + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9)))
          — End diff –

          use same `StreamRecord` and `GlobalWindow` object to keep the code more concise?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r116739961 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala — @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.harness + +import com.google.common.collect.Lists +import org.apache.flink.api.common.time.Time +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState +import org.junit.Assert.assertEquals +import org.junit.Test + +class CountTriggerWithCleanupStateHarnessTest { + protected var queryConfig = + new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3)) + + @Test + def testFiringAndFireingWithPurging(): Unit = { + val testHarness = new TriggerTestHarness [Any, TimeWindow] ( + CountTriggerWithCleanupState.of [TimeWindow] (queryConfig, 10), new TimeWindow.Serializer) + + // try to trigger onProcessingTime method via 1, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(1).size()) + + // register cleanup timer with 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord [Any] (1), new TimeWindow(0, 9))) — End diff – use same `StreamRecord` and `GlobalWindow` object to keep the code more concise?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3919#discussion_r116740788

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala —
          @@ -0,0 +1,146 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.triggers
          +
          +import java.lang.

          {Long => JLong}

          +
          +import org.apache.flink.api.common.functions.ReduceFunction
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
          +import org.apache.flink.streaming.api.windowing.triggers.

          {Trigger, TriggerResult}

          +import org.apache.flink.streaming.api.windowing.windows.Window
          +import org.apache.flink.table.api.

          {StreamQueryConfig, Types}

          +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum
          +
          +class CountTriggerWithCleanupState[W <: Window](queryConfig: StreamQueryConfig, maxCount: Long)
          + extends Trigger[Any, W] {
          +
          + private val serialVersionUID: Long = 1L
          +
          + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
          + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
          + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
          +
          + private val stateDesc: ReducingStateDescriptor[JLong] =
          + new ReducingStateDescriptor[JLong]("count", new Sum, Types.LONG)
          +
          + private val cleanupStateDesc: ValueStateDescriptor[JLong] =
          + new ValueStateDescriptor[JLong]("countCleanup", Types.LONG)
          +
          + override def canMerge: Boolean = true
          +
          + override def onMerge(window: W, ctx: Trigger.OnMergeContext)

          { + ctx.mergePartitionedState(stateDesc) + }

          +
          + override def toString: String = "CountTriggerWithCleanupState(" +
          + "minIdleStateRetentionTime=" + queryConfig.getMinIdleStateRetentionTime + ", " +
          + "maxIdleStateRetentionTime=" + queryConfig.getMaxIdleStateRetentionTime + ", " +
          + "maxCount=" + maxCount + ")"
          +
          + override def onElement(
          + element: Any,
          + timestamp: Long,
          + window: W,
          + ctx: TriggerContext): TriggerResult = {
          +
          + val currentTime = ctx.getCurrentProcessingTime
          +
          + // register cleanup timer
          + if (stateCleaningEnabled) {
          + // last registered timer
          + val curCleanupTime = ctx.getPartitionedState(cleanupStateDesc).value()
          +
          + // check if a cleanup timer is registered and
          + // that the current cleanup timer won't delete state we need to keep
          + if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
          + // we need to register a new (later) timer
          + val cleanupTime = currentTime + maxRetentionTime
          + // register timer and remember clean-up time
          + ctx.registerProcessingTimeTimer(cleanupTime)
          +
          + if (null != curCleanupTime)

          { + ctx.deleteProcessingTimeTimer(curCleanupTime) + }

          +
          + ctx.getPartitionedState(cleanupStateDesc).update(cleanupTime)
          + }
          + }
          +
          + val count: ReducingState[JLong] = ctx.getPartitionedState(stateDesc)
          + count.add(1L)
          +
          + if (count.get >= maxCount)

          { + count.clear() + return TriggerResult.FIRE + }

          +
          + return TriggerResult.CONTINUE
          — End diff –

          remove `return`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r116740788 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala — @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.triggers + +import java.lang. {Long => JLong} + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext +import org.apache.flink.streaming.api.windowing.triggers. {Trigger, TriggerResult} +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.table.api. {StreamQueryConfig, Types} +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum + +class CountTriggerWithCleanupState [W <: Window] (queryConfig: StreamQueryConfig, maxCount: Long) + extends Trigger [Any, W] { + + private val serialVersionUID: Long = 1L + + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val stateDesc: ReducingStateDescriptor [JLong] = + new ReducingStateDescriptor [JLong] ("count", new Sum, Types.LONG) + + private val cleanupStateDesc: ValueStateDescriptor [JLong] = + new ValueStateDescriptor [JLong] ("countCleanup", Types.LONG) + + override def canMerge: Boolean = true + + override def onMerge(window: W, ctx: Trigger.OnMergeContext) { + ctx.mergePartitionedState(stateDesc) + } + + override def toString: String = "CountTriggerWithCleanupState(" + + "minIdleStateRetentionTime=" + queryConfig.getMinIdleStateRetentionTime + ", " + + "maxIdleStateRetentionTime=" + queryConfig.getMaxIdleStateRetentionTime + ", " + + "maxCount=" + maxCount + ")" + + override def onElement( + element: Any, + timestamp: Long, + window: W, + ctx: TriggerContext): TriggerResult = { + + val currentTime = ctx.getCurrentProcessingTime + + // register cleanup timer + if (stateCleaningEnabled) { + // last registered timer + val curCleanupTime = ctx.getPartitionedState(cleanupStateDesc).value() + + // check if a cleanup timer is registered and + // that the current cleanup timer won't delete state we need to keep + if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) { + // we need to register a new (later) timer + val cleanupTime = currentTime + maxRetentionTime + // register timer and remember clean-up time + ctx.registerProcessingTimeTimer(cleanupTime) + + if (null != curCleanupTime) { + ctx.deleteProcessingTimeTimer(curCleanupTime) + } + + ctx.getPartitionedState(cleanupStateDesc).update(cleanupTime) + } + } + + val count: ReducingState [JLong] = ctx.getPartitionedState(stateDesc) + count.add(1L) + + if (count.get >= maxCount) { + count.clear() + return TriggerResult.FIRE + } + + return TriggerResult.CONTINUE — End diff – remove `return`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3919#discussion_r116740271

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala —
          @@ -0,0 +1,146 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.triggers
          +
          +import java.lang.

          {Long => JLong}

          +
          +import org.apache.flink.api.common.functions.ReduceFunction
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
          +import org.apache.flink.streaming.api.windowing.triggers.

          {Trigger, TriggerResult}

          +import org.apache.flink.streaming.api.windowing.windows.Window
          +import org.apache.flink.table.api.

          {StreamQueryConfig, Types}

          +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum
          +
          +class CountTriggerWithCleanupState[W <: Window](queryConfig: StreamQueryConfig, maxCount: Long)
          — End diff –

          We can type the trigger to `GlobalWindow`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r116740271 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala — @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.triggers + +import java.lang. {Long => JLong} + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext +import org.apache.flink.streaming.api.windowing.triggers. {Trigger, TriggerResult} +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.table.api. {StreamQueryConfig, Types} +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum + +class CountTriggerWithCleanupState [W <: Window] (queryConfig: StreamQueryConfig, maxCount: Long) — End diff – We can type the trigger to `GlobalWindow`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3919#discussion_r116740828

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala —
          @@ -0,0 +1,146 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.triggers
          +
          +import java.lang.

          {Long => JLong}

          +
          +import org.apache.flink.api.common.functions.ReduceFunction
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
          +import org.apache.flink.streaming.api.windowing.triggers.

          {Trigger, TriggerResult}

          +import org.apache.flink.streaming.api.windowing.windows.Window
          +import org.apache.flink.table.api.

          {StreamQueryConfig, Types}

          +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum
          +
          +class CountTriggerWithCleanupState[W <: Window](queryConfig: StreamQueryConfig, maxCount: Long)
          + extends Trigger[Any, W] {
          +
          + private val serialVersionUID: Long = 1L
          +
          + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime
          + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime
          + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1
          +
          + private val stateDesc: ReducingStateDescriptor[JLong] =
          + new ReducingStateDescriptor[JLong]("count", new Sum, Types.LONG)
          +
          + private val cleanupStateDesc: ValueStateDescriptor[JLong] =
          + new ValueStateDescriptor[JLong]("countCleanup", Types.LONG)
          +
          + override def canMerge: Boolean = true
          +
          + override def onMerge(window: W, ctx: Trigger.OnMergeContext)

          { + ctx.mergePartitionedState(stateDesc) + }

          +
          + override def toString: String = "CountTriggerWithCleanupState(" +
          + "minIdleStateRetentionTime=" + queryConfig.getMinIdleStateRetentionTime + ", " +
          + "maxIdleStateRetentionTime=" + queryConfig.getMaxIdleStateRetentionTime + ", " +
          + "maxCount=" + maxCount + ")"
          +
          + override def onElement(
          + element: Any,
          + timestamp: Long,
          + window: W,
          + ctx: TriggerContext): TriggerResult = {
          +
          + val currentTime = ctx.getCurrentProcessingTime
          +
          + // register cleanup timer
          + if (stateCleaningEnabled) {
          + // last registered timer
          + val curCleanupTime = ctx.getPartitionedState(cleanupStateDesc).value()
          +
          + // check if a cleanup timer is registered and
          + // that the current cleanup timer won't delete state we need to keep
          + if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
          + // we need to register a new (later) timer
          + val cleanupTime = currentTime + maxRetentionTime
          + // register timer and remember clean-up time
          + ctx.registerProcessingTimeTimer(cleanupTime)
          +
          + if (null != curCleanupTime)

          { + ctx.deleteProcessingTimeTimer(curCleanupTime) + }

          +
          + ctx.getPartitionedState(cleanupStateDesc).update(cleanupTime)
          + }
          + }
          +
          + val count: ReducingState[JLong] = ctx.getPartitionedState(stateDesc)
          + count.add(1L)
          +
          + if (count.get >= maxCount)

          { + count.clear() + return TriggerResult.FIRE + }

          +
          + return TriggerResult.CONTINUE
          + }
          +
          + override def onProcessingTime(
          + time: Long,
          + window: W,
          + ctx: TriggerContext): TriggerResult = {
          +
          + if (stateCleaningEnabled) {
          + val cleanupTime = ctx.getPartitionedState(cleanupStateDesc).value()
          + // check that the triggered timer is the last registered processing time timer.
          + if (null != cleanupTime && time == cleanupTime)

          { + clear(window, ctx) + return TriggerResult.FIRE_AND_PURGE + }

          + }
          + return TriggerResult.CONTINUE
          — End diff –

          remove `return`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r116740828 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala — @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.triggers + +import java.lang. {Long => JLong} + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext +import org.apache.flink.streaming.api.windowing.triggers. {Trigger, TriggerResult} +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.table.api. {StreamQueryConfig, Types} +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum + +class CountTriggerWithCleanupState [W <: Window] (queryConfig: StreamQueryConfig, maxCount: Long) + extends Trigger [Any, W] { + + private val serialVersionUID: Long = 1L + + protected val minRetentionTime: Long = queryConfig.getMinIdleStateRetentionTime + protected val maxRetentionTime: Long = queryConfig.getMaxIdleStateRetentionTime + protected val stateCleaningEnabled: Boolean = minRetentionTime > 1 + + private val stateDesc: ReducingStateDescriptor [JLong] = + new ReducingStateDescriptor [JLong] ("count", new Sum, Types.LONG) + + private val cleanupStateDesc: ValueStateDescriptor [JLong] = + new ValueStateDescriptor [JLong] ("countCleanup", Types.LONG) + + override def canMerge: Boolean = true + + override def onMerge(window: W, ctx: Trigger.OnMergeContext) { + ctx.mergePartitionedState(stateDesc) + } + + override def toString: String = "CountTriggerWithCleanupState(" + + "minIdleStateRetentionTime=" + queryConfig.getMinIdleStateRetentionTime + ", " + + "maxIdleStateRetentionTime=" + queryConfig.getMaxIdleStateRetentionTime + ", " + + "maxCount=" + maxCount + ")" + + override def onElement( + element: Any, + timestamp: Long, + window: W, + ctx: TriggerContext): TriggerResult = { + + val currentTime = ctx.getCurrentProcessingTime + + // register cleanup timer + if (stateCleaningEnabled) { + // last registered timer + val curCleanupTime = ctx.getPartitionedState(cleanupStateDesc).value() + + // check if a cleanup timer is registered and + // that the current cleanup timer won't delete state we need to keep + if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) { + // we need to register a new (later) timer + val cleanupTime = currentTime + maxRetentionTime + // register timer and remember clean-up time + ctx.registerProcessingTimeTimer(cleanupTime) + + if (null != curCleanupTime) { + ctx.deleteProcessingTimeTimer(curCleanupTime) + } + + ctx.getPartitionedState(cleanupStateDesc).update(cleanupTime) + } + } + + val count: ReducingState [JLong] = ctx.getPartitionedState(stateDesc) + count.add(1L) + + if (count.get >= maxCount) { + count.clear() + return TriggerResult.FIRE + } + + return TriggerResult.CONTINUE + } + + override def onProcessingTime( + time: Long, + window: W, + ctx: TriggerContext): TriggerResult = { + + if (stateCleaningEnabled) { + val cleanupTime = ctx.getPartitionedState(cleanupStateDesc).value() + // check that the triggered timer is the last registered processing time timer. + if (null != cleanupTime && time == cleanupTime) { + clear(window, ctx) + return TriggerResult.FIRE_AND_PURGE + } + } + return TriggerResult.CONTINUE — End diff – remove `return`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3919#discussion_r116729795

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala —
          @@ -0,0 +1,146 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.triggers
          +
          +import java.lang.

          {Long => JLong}

          +
          +import org.apache.flink.api.common.functions.ReduceFunction
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
          +import org.apache.flink.streaming.api.windowing.triggers.

          {Trigger, TriggerResult}

          +import org.apache.flink.streaming.api.windowing.windows.Window
          +import org.apache.flink.table.api.

          {StreamQueryConfig, Types}

          +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum
          +
          +class CountTriggerWithCleanupState[W <: Window](queryConfig: StreamQueryConfig, maxCount: Long)
          — End diff –

          Rename to `StateCleaningCountTrigger`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r116729795 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/CountTriggerWithCleanupState.scala — @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.triggers + +import java.lang. {Long => JLong} + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.common.state._ +import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext +import org.apache.flink.streaming.api.windowing.triggers. {Trigger, TriggerResult} +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.table.api. {StreamQueryConfig, Types} +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum + +class CountTriggerWithCleanupState [W <: Window] (queryConfig: StreamQueryConfig, maxCount: Long) — End diff – Rename to `StateCleaningCountTrigger`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3919#discussion_r116738258

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala —
          @@ -0,0 +1,305 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.harness
          +
          +import com.google.common.collect.Lists
          +import org.apache.flink.api.common.time.Time
          +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
          +import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
          +import org.apache.flink.table.api.StreamQueryConfig
          +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState
          +import org.junit.Assert.assertEquals
          +import org.junit.Test
          +
          +class CountTriggerWithCleanupStateHarnessTest {
          — End diff –

          rename to `StateCleaningCountTrigger`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r116738258 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala — @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.harness + +import com.google.common.collect.Lists +import org.apache.flink.api.common.time.Time +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState +import org.junit.Assert.assertEquals +import org.junit.Test + +class CountTriggerWithCleanupStateHarnessTest { — End diff – rename to `StateCleaningCountTrigger`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3919#discussion_r116740173

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala —
          @@ -0,0 +1,305 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.harness
          +
          +import com.google.common.collect.Lists
          +import org.apache.flink.api.common.time.Time
          +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
          +import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
          +import org.apache.flink.table.api.StreamQueryConfig
          +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState
          +import org.junit.Assert.assertEquals
          +import org.junit.Test
          +
          +class CountTriggerWithCleanupStateHarnessTest {
          + protected var queryConfig =
          + new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
          +
          + @Test
          + def testFiringAndFireingWithPurging(): Unit = {
          + val testHarness = new TriggerTestHarness[Any, TimeWindow](
          + CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 10), new TimeWindow.Serializer)
          +
          + // try to trigger onProcessingTime method via 1, but there is non timer is triggered
          + assertEquals(0, testHarness.advanceProcessingTime(1).size())
          +
          + // register cleanup timer with 3001
          + assertEquals(
          + TriggerResult.CONTINUE,
          + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9)))
          +
          + // try to trigger onProcessingTime method via 1000, but there is non timer is triggered
          + assertEquals(0, testHarness.advanceProcessingTime(1000).size())
          +
          + // 1000 + 2000 <= 3001 reuse timer 3001
          + assertEquals(
          + TriggerResult.CONTINUE,
          + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9)))
          +
          + // there are two state entries, one is timer(3001) another is counter(2)
          + assertEquals(2, testHarness.numStateEntries)
          +
          + // try to trigger onProcessingTime method via 3001, and timer(3001) is triggered
          + assertEquals(
          + TriggerResult.FIRE_AND_PURGE,
          + testHarness.advanceProcessingTime(3001).iterator().next().f1)
          +
          + assertEquals(0, testHarness.numStateEntries)
          +
          + // 3001 + 2000 >= 3001 register cleanup timer with 6001, and remove timer 3001
          + assertEquals(
          + TriggerResult.CONTINUE,
          + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9)))
          +
          + // try to trigger onProcessingTime method via 4002, but there is non timer is triggered
          + assertEquals(0, testHarness.advanceProcessingTime(4002).size())
          +
          + // 4002 + 2000 >= 6001 register cleanup timer via 7002, and remove timer 6001
          + assertEquals(
          + TriggerResult.CONTINUE,
          + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9)))
          +
          + // 4002 + 2000 <= 7002 reuse timer 7002
          + assertEquals(
          + TriggerResult.CONTINUE,
          + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9)))
          +
          + // have one timer 7002
          + assertEquals(1, testHarness.numProcessingTimeTimers)
          + assertEquals(0, testHarness.numEventTimeTimers)
          + assertEquals(2, testHarness.numStateEntries)
          + assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 9)))
          + assertEquals(0, testHarness.numStateEntries(new TimeWindow(9, 18)))
          — End diff –

          There is only a single global window. So, we do not need to test for different windows, IMO.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r116740173 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala — @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.harness + +import com.google.common.collect.Lists +import org.apache.flink.api.common.time.Time +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState +import org.junit.Assert.assertEquals +import org.junit.Test + +class CountTriggerWithCleanupStateHarnessTest { + protected var queryConfig = + new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3)) + + @Test + def testFiringAndFireingWithPurging(): Unit = { + val testHarness = new TriggerTestHarness [Any, TimeWindow] ( + CountTriggerWithCleanupState.of [TimeWindow] (queryConfig, 10), new TimeWindow.Serializer) + + // try to trigger onProcessingTime method via 1, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(1).size()) + + // register cleanup timer with 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord [Any] (1), new TimeWindow(0, 9))) + + // try to trigger onProcessingTime method via 1000, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(1000).size()) + + // 1000 + 2000 <= 3001 reuse timer 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord [Any] (1), new TimeWindow(0, 9))) + + // there are two state entries, one is timer(3001) another is counter(2) + assertEquals(2, testHarness.numStateEntries) + + // try to trigger onProcessingTime method via 3001, and timer(3001) is triggered + assertEquals( + TriggerResult.FIRE_AND_PURGE, + testHarness.advanceProcessingTime(3001).iterator().next().f1) + + assertEquals(0, testHarness.numStateEntries) + + // 3001 + 2000 >= 3001 register cleanup timer with 6001, and remove timer 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord [Any] (1), new TimeWindow(0, 9))) + + // try to trigger onProcessingTime method via 4002, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(4002).size()) + + // 4002 + 2000 >= 6001 register cleanup timer via 7002, and remove timer 6001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord [Any] (1), new TimeWindow(0, 9))) + + // 4002 + 2000 <= 7002 reuse timer 7002 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord [Any] (1), new TimeWindow(0, 9))) + + // have one timer 7002 + assertEquals(1, testHarness.numProcessingTimeTimers) + assertEquals(0, testHarness.numEventTimeTimers) + assertEquals(2, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 9))) + assertEquals(0, testHarness.numStateEntries(new TimeWindow(9, 18))) — End diff – There is only a single global window. So, we do not need to test for different windows, IMO.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3919#discussion_r116739074

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala —
          @@ -0,0 +1,305 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.harness
          +
          +import com.google.common.collect.Lists
          +import org.apache.flink.api.common.time.Time
          +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
          +import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
          +import org.apache.flink.table.api.StreamQueryConfig
          +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState
          +import org.junit.Assert.assertEquals
          +import org.junit.Test
          +
          +class CountTriggerWithCleanupStateHarnessTest {
          + protected var queryConfig =
          + new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
          +
          + @Test
          + def testFiringAndFireingWithPurging(): Unit = {
          + val testHarness = new TriggerTestHarness[Any, TimeWindow](
          + CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 10), new TimeWindow.Serializer)
          — End diff –

          Count windows are based on `GlobalWindow`. Use a `GlobalWindow` instead of a `TimeWindow`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r116739074 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala — @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.harness + +import com.google.common.collect.Lists +import org.apache.flink.api.common.time.Time +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState +import org.junit.Assert.assertEquals +import org.junit.Test + +class CountTriggerWithCleanupStateHarnessTest { + protected var queryConfig = + new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3)) + + @Test + def testFiringAndFireingWithPurging(): Unit = { + val testHarness = new TriggerTestHarness [Any, TimeWindow] ( + CountTriggerWithCleanupState.of [TimeWindow] (queryConfig, 10), new TimeWindow.Serializer) — End diff – Count windows are based on `GlobalWindow`. Use a `GlobalWindow` instead of a `TimeWindow`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3919#discussion_r116741416

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala —
          @@ -0,0 +1,305 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.harness
          +
          +import com.google.common.collect.Lists
          +import org.apache.flink.api.common.time.Time
          +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
          +import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
          +import org.apache.flink.table.api.StreamQueryConfig
          +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState
          +import org.junit.Assert.assertEquals
          +import org.junit.Test
          +
          +class CountTriggerWithCleanupStateHarnessTest {
          + protected var queryConfig =
          + new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
          +
          + @Test
          + def testFiringAndFireingWithPurging(): Unit =

          { + val testHarness = new TriggerTestHarness[Any, TimeWindow]( + CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 10), new TimeWindow.Serializer) + + // try to trigger onProcessingTime method via 1, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(1).size()) + + // register cleanup timer with 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // try to trigger onProcessingTime method via 1000, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(1000).size()) + + // 1000 + 2000 <= 3001 reuse timer 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // there are two state entries, one is timer(3001) another is counter(2) + assertEquals(2, testHarness.numStateEntries) + + // try to trigger onProcessingTime method via 3001, and timer(3001) is triggered + assertEquals( + TriggerResult.FIRE_AND_PURGE, + testHarness.advanceProcessingTime(3001).iterator().next().f1) + + assertEquals(0, testHarness.numStateEntries) + + // 3001 + 2000 >= 3001 register cleanup timer with 6001, and remove timer 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // try to trigger onProcessingTime method via 4002, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(4002).size()) + + // 4002 + 2000 >= 6001 register cleanup timer via 7002, and remove timer 6001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // 4002 + 2000 <= 7002 reuse timer 7002 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // have one timer 7002 + assertEquals(1, testHarness.numProcessingTimeTimers) + assertEquals(0, testHarness.numEventTimeTimers) + assertEquals(2, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 9))) + assertEquals(0, testHarness.numStateEntries(new TimeWindow(9, 18))) + + // 4002 + 2000 <= 7002 reuse timer 7002 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // register cleanup timer via 7002 for window (9, 18) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + + // there are four state entries + assertEquals(4, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 9))) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(9, 18))) + + // the window counter triggered, count >= 10 + assertEquals( + TriggerResult.FIRE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + + // counter of window(9, 18) is cleared + assertEquals(3, testHarness.numStateEntries) + + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + assertEquals( + TriggerResult.FIRE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // counter of window(0, 9) is cleared + assertEquals(2, testHarness.numStateEntries) + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 9))) + assertEquals(1, testHarness.numStateEntries(new TimeWindow(9, 18))) + + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(18, 27))) + + assertEquals(4, testHarness.numStateEntries) + + // try to trigger onProcessingTime method via 7002, and all states are cleared + assertEquals( + TriggerResult.FIRE_AND_PURGE, + testHarness.advanceProcessingTime(7002).iterator().next().f1) + + assertEquals(0, testHarness.numStateEntries) + }

          +
          + /**
          + * Verify that clear() does not leak across windows.
          + */
          + @Test
          + def testClear()

          { + val testHarness = new TriggerTestHarness[Any, TimeWindow]( + CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 3), + new TimeWindow.Serializer) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 2))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(2, 4))) + // have 2 timers + assertEquals(2, testHarness.numProcessingTimeTimers) + assertEquals(0, testHarness.numEventTimeTimers) + assertEquals(4, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 2))) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(2, 4))) + testHarness.clearTriggerState(new TimeWindow(2, 4)) + assertEquals(2, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 2))) + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))) + testHarness.clearTriggerState(new TimeWindow(0, 2)) + assertEquals(0, testHarness.numStateEntries) + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))) + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))) + }

          +
          + @Test
          + def testMergingWindows() {
          — End diff –

          If the trigger does not support merging we can drop the next two tests.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r116741416 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala — @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.harness + +import com.google.common.collect.Lists +import org.apache.flink.api.common.time.Time +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState +import org.junit.Assert.assertEquals +import org.junit.Test + +class CountTriggerWithCleanupStateHarnessTest { + protected var queryConfig = + new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3)) + + @Test + def testFiringAndFireingWithPurging(): Unit = { + val testHarness = new TriggerTestHarness[Any, TimeWindow]( + CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 10), new TimeWindow.Serializer) + + // try to trigger onProcessingTime method via 1, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(1).size()) + + // register cleanup timer with 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // try to trigger onProcessingTime method via 1000, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(1000).size()) + + // 1000 + 2000 <= 3001 reuse timer 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // there are two state entries, one is timer(3001) another is counter(2) + assertEquals(2, testHarness.numStateEntries) + + // try to trigger onProcessingTime method via 3001, and timer(3001) is triggered + assertEquals( + TriggerResult.FIRE_AND_PURGE, + testHarness.advanceProcessingTime(3001).iterator().next().f1) + + assertEquals(0, testHarness.numStateEntries) + + // 3001 + 2000 >= 3001 register cleanup timer with 6001, and remove timer 3001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // try to trigger onProcessingTime method via 4002, but there is non timer is triggered + assertEquals(0, testHarness.advanceProcessingTime(4002).size()) + + // 4002 + 2000 >= 6001 register cleanup timer via 7002, and remove timer 6001 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // 4002 + 2000 <= 7002 reuse timer 7002 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // have one timer 7002 + assertEquals(1, testHarness.numProcessingTimeTimers) + assertEquals(0, testHarness.numEventTimeTimers) + assertEquals(2, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 9))) + assertEquals(0, testHarness.numStateEntries(new TimeWindow(9, 18))) + + // 4002 + 2000 <= 7002 reuse timer 7002 + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // register cleanup timer via 7002 for window (9, 18) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + + // there are four state entries + assertEquals(4, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 9))) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(9, 18))) + + // the window counter triggered, count >= 10 + assertEquals( + TriggerResult.FIRE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(9, 18))) + + // counter of window(9, 18) is cleared + assertEquals(3, testHarness.numStateEntries) + + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + assertEquals( + TriggerResult.FIRE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 9))) + + // counter of window(0, 9) is cleared + assertEquals(2, testHarness.numStateEntries) + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 9))) + assertEquals(1, testHarness.numStateEntries(new TimeWindow(9, 18))) + + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(18, 27))) + + assertEquals(4, testHarness.numStateEntries) + + // try to trigger onProcessingTime method via 7002, and all states are cleared + assertEquals( + TriggerResult.FIRE_AND_PURGE, + testHarness.advanceProcessingTime(7002).iterator().next().f1) + + assertEquals(0, testHarness.numStateEntries) + } + + /** + * Verify that clear() does not leak across windows. + */ + @Test + def testClear() { + val testHarness = new TriggerTestHarness[Any, TimeWindow]( + CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 3), + new TimeWindow.Serializer) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(0, 2))) + assertEquals( + TriggerResult.CONTINUE, + testHarness.processElement(new StreamRecord[Any](1), new TimeWindow(2, 4))) + // have 2 timers + assertEquals(2, testHarness.numProcessingTimeTimers) + assertEquals(0, testHarness.numEventTimeTimers) + assertEquals(4, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 2))) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(2, 4))) + testHarness.clearTriggerState(new TimeWindow(2, 4)) + assertEquals(2, testHarness.numStateEntries) + assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 2))) + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))) + testHarness.clearTriggerState(new TimeWindow(0, 2)) + assertEquals(0, testHarness.numStateEntries) + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))) + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))) + } + + @Test + def testMergingWindows() { — End diff – If the trigger does not support merging we can drop the next two tests.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3919

          Hi @fhueske Thanks a lot for your suggestions.I had updated the PR. according the comments.

          Best,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3919 Hi @fhueske Thanks a lot for your suggestions.I had updated the PR. according the comments. Best, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3919#discussion_r116932844

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala —
          @@ -19,44 +19,46 @@ package org.apache.flink.table.runtime.triggers

          import java.lang.

          {Long => JLong}

          +import org.apache.flink.annotation.PublicEvolving
          import org.apache.flink.api.common.functions.ReduceFunction
          import org.apache.flink.api.common.state._
          import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
          import org.apache.flink.streaming.api.windowing.triggers.

          {Trigger, TriggerResult}

          -import org.apache.flink.streaming.api.windowing.windows.Window
          +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
          import org.apache.flink.table.api.

          {StreamQueryConfig, Types}

          -import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum
          +import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger.Sum

          -class CountTriggerWithCleanupState[W <: Window](queryConfig: StreamQueryConfig, maxCount: Long)

          • extends Trigger[Any, W] {
            +/**
            + * A {@link Trigger}

            that fires once the count of elements in a pane reaches the given count
            + * or the cleanup timer is triggered.
            + */
            +@PublicEvolving

              • End diff –

          We only annotate classes the core, DataSet and DataStream APIs with stability annotations.
          `@PublicEvolving` should be removed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r116932844 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala — @@ -19,44 +19,46 @@ package org.apache.flink.table.runtime.triggers import java.lang. {Long => JLong} +import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.state._ import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext import org.apache.flink.streaming.api.windowing.triggers. {Trigger, TriggerResult} -import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow import org.apache.flink.table.api. {StreamQueryConfig, Types} -import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState.Sum +import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger.Sum -class CountTriggerWithCleanupState [W <: Window] (queryConfig: StreamQueryConfig, maxCount: Long) extends Trigger [Any, W] { +/** + * A {@link Trigger} that fires once the count of elements in a pane reaches the given count + * or the cleanup timer is triggered. + */ +@PublicEvolving End diff – We only annotate classes the core, DataSet and DataStream APIs with stability annotations. `@PublicEvolving` should be removed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3919

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3919
          Hide
          fhueske Fabian Hueske added a comment -

          Fixed for 1.3.0 with 36cac0fb64ade6debc66f06e00d4184cdc0fba5f
          Fixed for 1.4.0 with d85d969334e89d83aec60f9bb3d2c69a4701eb54

          Show
          fhueske Fabian Hueske added a comment - Fixed for 1.3.0 with 36cac0fb64ade6debc66f06e00d4184cdc0fba5f Fixed for 1.4.0 with d85d969334e89d83aec60f9bb3d2c69a4701eb54
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tedyu commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3919#discussion_r117121382

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala —
          @@ -245,6 +264,7 @@ object DataStreamGroupWindowAggregate {
          case SlidingGroupWindow(_, timeField, size, slide)
          if isProctimeAttribute(timeField) && isRowCountLiteral(size) =>
          stream.countWindow(toLong(size), toLong(slide))
          + .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide)));
          — End diff –

          Should this be toLong(size) ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r117121382 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala — @@ -245,6 +264,7 @@ object DataStreamGroupWindowAggregate { case SlidingGroupWindow(_, timeField, size, slide) if isProctimeAttribute(timeField) && isRowCountLiteral(size) => stream.countWindow(toLong(size), toLong(slide)) + .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide))); — End diff – Should this be toLong(size) ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tedyu commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3919#discussion_r117122323

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala —
          @@ -131,6 +135,19 @@ class DataStreamGroupWindowAggregate(
          "non-windowed GroupBy aggregation.")
          }

          + val isCountWindow = window match

          { + case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => true + case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => true + case _ => false + }

          +
          + if (isCountWindow && grouping.length > 0 && queryConfig.getMinIdleStateRetentionTime < 0) {
          + LOG.warn(
          — End diff –

          Should this be error() ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r117122323 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala — @@ -131,6 +135,19 @@ class DataStreamGroupWindowAggregate( "non-windowed GroupBy aggregation.") } + val isCountWindow = window match { + case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => true + case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => true + case _ => false + } + + if (isCountWindow && grouping.length > 0 && queryConfig.getMinIdleStateRetentionTime < 0) { + LOG.warn( — End diff – Should this be error() ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3919#discussion_r117183206

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala —
          @@ -245,6 +264,7 @@ object DataStreamGroupWindowAggregate {
          case SlidingGroupWindow(_, timeField, size, slide)
          if isProctimeAttribute(timeField) && isRowCountLiteral(size) =>
          stream.countWindow(toLong(size), toLong(slide))
          + .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide)));
          — End diff –

          `slide` is correct here (check `KeyedStream.countWindow(long, long)`).

          The default trigger is replaced by a trigger that additional registers a clean up timer. The trigger policy based on counts is still the same.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r117183206 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala — @@ -245,6 +264,7 @@ object DataStreamGroupWindowAggregate { case SlidingGroupWindow(_, timeField, size, slide) if isProctimeAttribute(timeField) && isRowCountLiteral(size) => stream.countWindow(toLong(size), toLong(slide)) + .trigger(StateCleaningCountTrigger.of(queryConfig, toLong(slide))); — End diff – `slide` is correct here (check `KeyedStream.countWindow(long, long)`). The default trigger is replaced by a trigger that additional registers a clean up timer. The trigger policy based on counts is still the same.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3919#discussion_r117183489

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala —
          @@ -131,6 +135,19 @@ class DataStreamGroupWindowAggregate(
          "non-windowed GroupBy aggregation.")
          }

          + val isCountWindow = window match

          { + case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => true + case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => true + case _ => false + }

          +
          + if (isCountWindow && grouping.length > 0 && queryConfig.getMinIdleStateRetentionTime < 0) {
          + LOG.warn(
          — End diff –

          I think this can be argued in both ways. If we log an error message could also throw an exception and prevent the program to be executed. However, there are also other strategies to deal with state cleanup, e.g., by configuring RocksDB.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3919#discussion_r117183489 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala — @@ -131,6 +135,19 @@ class DataStreamGroupWindowAggregate( "non-windowed GroupBy aggregation.") } + val isCountWindow = window match { + case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => true + case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => true + case _ => false + } + + if (isCountWindow && grouping.length > 0 && queryConfig.getMinIdleStateRetentionTime < 0) { + LOG.warn( — End diff – I think this can be argued in both ways. If we log an error message could also throw an exception and prevent the program to be executed. However, there are also other strategies to deal with state cleanup, e.g., by configuring RocksDB.

            People

            • Assignee:
              sunjincheng121 sunjincheng
              Reporter:
              sunjincheng121 sunjincheng
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development