Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0
    • Fix Version/s: None
    • Component/s: CEP
    • Labels:
      None

      Description

      This issue targets making the operators in the CEP library re-scalable. After this is implemented, the user will be able to take a savepoint and restart his job with a different parallelism.
      This issue depends on https://issues.apache.org/jira/browse/FLINK-5845.

      The way this is done is that we introduce the TimeServiceHandler in the AbstractStreamOperator, which keeps the registered InternalTimerService s (before this was in the AbstractStreamOperator) and a new service called KeyRegistry. The KeyRegistry will be fault tolerant and rescalable and will allow to register keys and a callback which will be invoked for each registered key upon reception of a watermark. This can be seen as keeping (recurring) timers for each of the registered keys that will fire "at the next watermark".

      After introducing this service, upon reception of a watermark, all the processing of the NFAs will be delegated to the callback.

        Issue Links

          Activity

          Hide
          till.rohrmann Till Rohrmann added a comment -

          Adding a description containing some more details about the problem could be helpful.

          Show
          till.rohrmann Till Rohrmann added a comment - Adding a description containing some more details about the problem could be helpful.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user kl0u opened a pull request:

          https://github.com/apache/flink/pull/3307

          FLINK-5420 Make CEP operators rescalable

          Transforms the CEP operators into ProcessFunctions that use only managed keyed state. Rescalability now comes out-of-the-box. In addition, for the keyed operator, the list of seen keys is replaced by timers set by the incoming elements that will fire at the next watermark.

          More information about how rescalability is achieved can be found here:
          https://issues.apache.org/jira/browse/FLINK-5420

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/kl0u/flink cep-ref

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3307.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3307


          commit 230ee80355f7219d56d1010d8850dbe39866c7e8
          Author: kl0u <kkloudas@gmail.com>
          Date: 2017-01-27T14:30:32Z

          FLINK-5420 Make CEP operators rescalable

          Transforms the CEP operators into ProcessFunctions
          that use only managed keyed state. Rescalability
          now comes out-of-the-box. In addition, for the
          keyed operator, the list of seen keys is replaced
          by timers set by the incoming elements that will
          fire at the next watermark.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3307 FLINK-5420 Make CEP operators rescalable Transforms the CEP operators into ProcessFunctions that use only managed keyed state. Rescalability now comes out-of-the-box. In addition, for the keyed operator, the list of seen keys is replaced by timers set by the incoming elements that will fire at the next watermark. More information about how rescalability is achieved can be found here: https://issues.apache.org/jira/browse/FLINK-5420 You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink cep-ref Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3307.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3307 commit 230ee80355f7219d56d1010d8850dbe39866c7e8 Author: kl0u <kkloudas@gmail.com> Date: 2017-01-27T14:30:32Z FLINK-5420 Make CEP operators rescalable Transforms the CEP operators into ProcessFunctions that use only managed keyed state. Rescalability now comes out-of-the-box. In addition, for the keyed operator, the list of seen keys is replaced by timers set by the incoming elements that will fire at the next watermark.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u closed the pull request at:

          https://github.com/apache/flink/pull/3307

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/3307
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user kl0u opened a pull request:

          https://github.com/apache/flink/pull/3412

          FLINK-5420 [cep] Make the CEP operators rescalable

          More on the related JIRA: https://issues.apache.org/jira/browse/FLINK-5420

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/kl0u/flink cep-uni-rescale

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3412.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3412


          commit ee5862973d2b8b8dca8333eaaff6ca82a4e9a069
          Author: kl0u <kkloudas@gmail.com>
          Date: 2017-02-24T09:34:43Z

          FLINK-5420 [cep] Make the CEP operators rescalable

          Introduces the KeyRegistry in the TimeServiceHandler
          which allows to specify a callback and register keys
          for which we want this callback to be invoked on each
          watermark.

          Given this service, now the CEP operator has only
          keyed state, and the non-keyed one (keys) are
          handled by the KeyRegistry.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3412 FLINK-5420 [cep] Make the CEP operators rescalable More on the related JIRA: https://issues.apache.org/jira/browse/FLINK-5420 You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink cep-uni-rescale Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3412.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3412 commit ee5862973d2b8b8dca8333eaaff6ca82a4e9a069 Author: kl0u <kkloudas@gmail.com> Date: 2017-02-24T09:34:43Z FLINK-5420 [cep] Make the CEP operators rescalable Introduces the KeyRegistry in the TimeServiceHandler which allows to specify a callback and register keys for which we want this callback to be invoked on each watermark. Given this service, now the CEP operator has only keyed state, and the non-keyed one (keys) are handled by the KeyRegistry.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3412#discussion_r102940901

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyRegistry.java —
          @@ -0,0 +1,257 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.api.operators;
          +
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
          +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
          +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
          +import org.apache.flink.runtime.state.KeyGroupsList;
          +import org.apache.flink.streaming.api.watermark.Watermark;
          +import org.apache.flink.util.InstantiationUtil;
          +
          +import java.io.IOException;
          +import java.util.HashSet;
          +import java.util.Set;
          +
          +import static org.apache.flink.util.Preconditions.checkArgument;
          +import static org.apache.flink.util.Preconditions.checkNotNull;
          +
          +/**
          + * The key registry allows to register a

          {@link OnWatermarkCallback}

          and multiple keys, for which
          + * the callback will be invoked periodically, upon reception of each subsequent

          {@link Watermark}

          ,
          + * after the registration of the key.
          + * <p>
          + * <b>NOTE: </b> This service is only available to <b>keyed</b> operators.
          + *
          + * @param <K> The type of key returned by the

          {@code KeySelector}

          .
          + * */
          +public class KeyRegistry<K> {
          — End diff –

          To keep it in line with the other services this could be called `InternalWatermarkCallbackService`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3412#discussion_r102940901 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyRegistry.java — @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The key registry allows to register a {@link OnWatermarkCallback} and multiple keys, for which + * the callback will be invoked periodically, upon reception of each subsequent {@link Watermark} , + * after the registration of the key. + * <p> + * <b>NOTE: </b> This service is only available to <b>keyed</b> operators. + * + * @param <K> The type of key returned by the {@code KeySelector} . + * */ +public class KeyRegistry<K> { — End diff – To keep it in line with the other services this could be called `InternalWatermarkCallbackService`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3412#discussion_r102940776

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimeServiceHandler.java —
          @@ -0,0 +1,223 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.api.operators;
          +
          +import org.apache.flink.annotation.VisibleForTesting;
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
          +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
          +import org.apache.flink.runtime.state.KeyGroupsList;
          +import org.apache.flink.runtime.state.VoidNamespaceSerializer;
          +import org.apache.flink.streaming.api.watermark.Watermark;
          +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
          +import org.apache.flink.util.Preconditions;
          +
          +import java.io.IOException;
          +import java.util.HashMap;
          +import java.util.Map;
          +
          +/**
          + * A handler keeping all the time-related services available to all operators extending the
          + *

          {@link AbstractStreamOperator}

          . These are the different

          {@link HeapInternalTimerService timer services}

          + * and the

          {@link KeyRegistry}

          .
          + *
          + * <b>NOTE:</b> These services are only available to keyed operators.
          + *
          + * @param <K> The type of keys used for the timers and the registry.
          + * @param <N> The type of namespace used for the timers.
          + */
          +public class TimeServiceHandler<K, N> {
          — End diff –

          This should be package private since it's only used by the `AbstractStreamOperator`. Maybe also mark it as `@Internal` just to be save.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3412#discussion_r102940776 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimeServiceHandler.java — @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * A handler keeping all the time-related services available to all operators extending the + * {@link AbstractStreamOperator} . These are the different {@link HeapInternalTimerService timer services} + * and the {@link KeyRegistry} . + * + * <b>NOTE:</b> These services are only available to keyed operators. + * + * @param <K> The type of keys used for the timers and the registry. + * @param <N> The type of namespace used for the timers. + */ +public class TimeServiceHandler<K, N> { — End diff – This should be package private since it's only used by the `AbstractStreamOperator`. Maybe also mark it as `@Internal` just to be save.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3412#discussion_r102939631

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java —
          @@ -887,6 +864,30 @@ public void close() {
          // Watermark handling
          // ------------------------------------------------------------------------

          + public <K> void registerWatermarkCallback(KeyRegistry.OnWatermarkCallback<K> callback, TypeSerializer<K> keySerializer) {
          — End diff –

          The key serialiser can be retrieved from `getKeyedStateBackend().getKeySerializer()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3412#discussion_r102939631 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java — @@ -887,6 +864,30 @@ public void close() { // Watermark handling // ------------------------------------------------------------------------ + public <K> void registerWatermarkCallback(KeyRegistry.OnWatermarkCallback<K> callback, TypeSerializer<K> keySerializer) { — End diff – The key serialiser can be retrieved from `getKeyedStateBackend().getKeySerializer()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3412#discussion_r102940564

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java —
          @@ -887,6 +864,30 @@ public void close() {
          // Watermark handling
          // ------------------------------------------------------------------------

          + public <K> void registerWatermarkCallback(KeyRegistry.OnWatermarkCallback<K> callback, TypeSerializer<K> keySerializer) {
          — End diff –

          Come to think of it, we should probably only expose a method like `getInternalWatermarkCallbackService()`. (I discussed this offline with @kl0u)

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3412#discussion_r102940564 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java — @@ -887,6 +864,30 @@ public void close() { // Watermark handling // ------------------------------------------------------------------------ + public <K> void registerWatermarkCallback(KeyRegistry.OnWatermarkCallback<K> callback, TypeSerializer<K> keySerializer) { — End diff – Come to think of it, we should probably only expose a method like `getInternalWatermarkCallbackService()`. (I discussed this offline with @kl0u)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/3412

          Thanks for the comments @aljoscha . I integrated them and waiting for Travis.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3412 Thanks for the comments @aljoscha . I integrated them and waiting for Travis.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3412#discussion_r103471738

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OnWatermarkCallback.java —
          @@ -0,0 +1,39 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.api.operators;
          +
          +import org.apache.flink.streaming.api.watermark.Watermark;
          +
          +import java.io.IOException;
          +
          +/**
          + * A callback registered with the

          {@link InternalWatermarkCallbackService}

          service. This callback will
          + * be invoked for all keys registered with the service, upon reception of a watermark.
          + */
          +public interface OnWatermarkCallback<KEY> {
          — End diff –

          This should probably also be `@Internal`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3412#discussion_r103471738 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OnWatermarkCallback.java — @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.streaming.api.watermark.Watermark; + +import java.io.IOException; + +/** + * A callback registered with the {@link InternalWatermarkCallbackService} service. This callback will + * be invoked for all keys registered with the service, upon reception of a watermark. + */ +public interface OnWatermarkCallback<KEY> { — End diff – This should probably also be `@Internal`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3412#discussion_r103471629

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java —
          @@ -0,0 +1,237 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.api.operators;
          +
          +import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
          +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
          +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
          +import org.apache.flink.runtime.state.KeyGroupsList;
          +import org.apache.flink.streaming.api.watermark.Watermark;
          +import org.apache.flink.util.InstantiationUtil;
          +
          +import java.io.IOException;
          +import java.util.HashSet;
          +import java.util.Set;
          +
          +import static org.apache.flink.util.Preconditions.checkArgument;
          +import static org.apache.flink.util.Preconditions.checkNotNull;
          +
          +/**
          + * The watermark callback service allows to register a

          {@link OnWatermarkCallback OnWatermarkCallback}

          + * and multiple keys, for which the callback will be invoked every time a new

          {@link Watermark}

          is received
          + * (after the registration of the key).
          + * <p>
          + * <b>NOTE: </b> This service is only available to <b>keyed</b> operators.
          + *
          + * @param <K> The type of key returned by the

          {@code KeySelector}

          .
          + * */
          +public class InternalWatermarkCallbackService<K> {
          — End diff –

          Slight Javadoc typo, and this should probably be `@Internal`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3412#discussion_r103471629 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java — @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The watermark callback service allows to register a {@link OnWatermarkCallback OnWatermarkCallback} + * and multiple keys, for which the callback will be invoked every time a new {@link Watermark} is received + * (after the registration of the key). + * <p> + * <b>NOTE: </b> This service is only available to <b>keyed</b> operators. + * + * @param <K> The type of key returned by the {@code KeySelector} . + * */ +public class InternalWatermarkCallbackService<K> { — End diff – Slight Javadoc typo, and this should probably be `@Internal`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/3412

          Thanks for the comments @aljoscha .

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3412 Thanks for the comments @aljoscha .
          Hide
          kkl0u Kostas Kloudas added a comment -

          Merged at commit daf0ccda4dc60a267be7b8074d40e48d22ccb13f

          Show
          kkl0u Kostas Kloudas added a comment - Merged at commit daf0ccda4dc60a267be7b8074d40e48d22ccb13f
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u closed the pull request at:

          https://github.com/apache/flink/pull/3412

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/3412

            People

            • Assignee:
              kkl0u Kostas Kloudas
              Reporter:
              kkl0u Kostas Kloudas
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development