Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Component/s: CEP
    • Labels:
      None

      Description

      Currently the keyed and non-keyed operators in the CEP library have different implementations. This issue targets to unify them into one.

      This new implementation will always be applied on a keyed stream, and in the case of non-keyed usecases, the input stream will be keyed on a dummy key, as done in the case of the DataStream.windowAll() method, where the input stream is keyed using the NullByteKeySelector.

      This is a first step towards making the CEP operators rescalable.

        Issue Links

          Activity

          Hide
          kkl0u Kostas Kloudas added a comment -

          Closed on master with 15ae922ad4151701cbb4e0df207f43d0094366d1

          Show
          kkl0u Kostas Kloudas added a comment - Closed on master with 15ae922ad4151701cbb4e0df207f43d0094366d1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u closed the pull request at:

          https://github.com/apache/flink/pull/3375

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/3375
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/3375

          Thanks @tillrohrmann ! I integrated your comments and merged it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3375 Thanks @tillrohrmann ! I integrated your comments and merged it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3375#discussion_r102462479

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java —
          @@ -40,25 +40,27 @@
          public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOperator<IN, KEY, Map<String, IN>> {
          private static final long serialVersionUID = 5328573789532074581L;

          • public KeyedCEPPatternOperator(TypeSerializer<IN> inputSerializer, boolean isProcessingTime, KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> keySerializer, NFACompiler.NFAFactory<IN> nfaFactory) {
            + public KeyedCEPPatternOperator(
            + TypeSerializer<IN> inputSerializer,
            + boolean isProcessingTime,
            + KeySelector<IN, KEY> keySelector,
            + TypeSerializer<KEY> keySerializer,
            + NFACompiler.NFAFactory<IN> nfaFactory) {
              • End diff –

          double indentation for method declarations is better because then one can more easily distinguish the parameter list from the body.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3375#discussion_r102462479 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java — @@ -40,25 +40,27 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOperator<IN, KEY, Map<String, IN>> { private static final long serialVersionUID = 5328573789532074581L; public KeyedCEPPatternOperator(TypeSerializer<IN> inputSerializer, boolean isProcessingTime, KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> keySerializer, NFACompiler.NFAFactory<IN> nfaFactory) { + public KeyedCEPPatternOperator( + TypeSerializer<IN> inputSerializer, + boolean isProcessingTime, + KeySelector<IN, KEY> keySelector, + TypeSerializer<KEY> keySerializer, + NFACompiler.NFAFactory<IN> nfaFactory) { End diff – double indentation for method declarations is better because then one can more easily distinguish the parameter list from the body.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3375#discussion_r102461630

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java —
          @@ -100,27 +119,21 @@ public void open() throws Exception {

          if (nfaOperatorState == null)

          { nfaOperatorState = getPartitionedState( - new ValueStateDescriptor<NFA<IN>>( - NFA_OPERATOR_STATE_NAME, - new NFA.Serializer<IN>())); + new ValueStateDescriptor<>(NFA_OPERATOR_STATE_NAME, new NFA.Serializer<IN>())); }

          @SuppressWarnings("unchecked,rawtypes")
          TypeSerializer<StreamRecord<IN>> streamRecordSerializer =

          • (TypeSerializer) new StreamElementSerializer<>(getInputSerializer());
            + (TypeSerializer) new StreamElementSerializer<>(getInputSerializer());

          if (priorityQueueOperatorState == null) {
          priorityQueueOperatorState = getPartitionedState(

          • new ValueStateDescriptor<>(
          • PRIORIRY_QUEUE_STATE_NAME,
          • new PriorityQueueSerializer<>(
          • streamRecordSerializer,
          • new PriorityQueueStreamRecordFactory<IN>())));
            + new ValueStateDescriptor<>(PRIORITY_QUEUE_STATE_NAME,
            + new PriorityQueueSerializer<>(streamRecordSerializer, new PriorityQueueStreamRecordFactory<IN>())));
              • End diff –

          If doing reformatting changes then please try to not change a consistent style. When breaking long parameter lists, imo, every parameter should be on a separate line and indented identically. This is not the case with `PRIORITY_QUEUE_STATE_NAME`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3375#discussion_r102461630 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java — @@ -100,27 +119,21 @@ public void open() throws Exception { if (nfaOperatorState == null) { nfaOperatorState = getPartitionedState( - new ValueStateDescriptor<NFA<IN>>( - NFA_OPERATOR_STATE_NAME, - new NFA.Serializer<IN>())); + new ValueStateDescriptor<>(NFA_OPERATOR_STATE_NAME, new NFA.Serializer<IN>())); } @SuppressWarnings("unchecked,rawtypes") TypeSerializer<StreamRecord<IN>> streamRecordSerializer = (TypeSerializer) new StreamElementSerializer<>(getInputSerializer()); + (TypeSerializer) new StreamElementSerializer<>(getInputSerializer()); if (priorityQueueOperatorState == null) { priorityQueueOperatorState = getPartitionedState( new ValueStateDescriptor<>( PRIORIRY_QUEUE_STATE_NAME, new PriorityQueueSerializer<>( streamRecordSerializer, new PriorityQueueStreamRecordFactory<IN>()))); + new ValueStateDescriptor<>(PRIORITY_QUEUE_STATE_NAME, + new PriorityQueueSerializer<>(streamRecordSerializer, new PriorityQueueStreamRecordFactory<IN>()))); End diff – If doing reformatting changes then please try to not change a consistent style. When breaking long parameter lists, imo, every parameter should be on a separate line and indented identically. This is not the case with `PRIORITY_QUEUE_STATE_NAME`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3375#discussion_r102462053

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java —
          @@ -127,16 +134,36 @@
          keySerializer,
          nfaFactory));
          } else

          { - patternStream = inputStream.transform( + + KeySelector<T, Byte> keySelector = new NullByteKeySelector<>(); + TypeSerializer<Byte> keySerializer = ByteSerializer.INSTANCE; + + patternStream = inputStream.keyBy(new NullByteKeySelector<T>()).transform( "TimeoutCEPPatternOperator", eitherTypeInformation, - new TimeoutCEPPatternOperator<>( + new TimeoutKeyedCEPPatternOperator<>( inputSerializer, isProcessingTime, + keySelector, + keySerializer, nfaFactory )).forceNonParallel(); }

          return patternStream;
          }
          +
          + /**
          + * Used as dummy KeySelector to allow using WindowOperator for Non-Keyed Windows.
          — End diff –

          Does not seem to fit here. Copy & paste artifact?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3375#discussion_r102462053 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java — @@ -127,16 +134,36 @@ keySerializer, nfaFactory)); } else { - patternStream = inputStream.transform( + + KeySelector<T, Byte> keySelector = new NullByteKeySelector<>(); + TypeSerializer<Byte> keySerializer = ByteSerializer.INSTANCE; + + patternStream = inputStream.keyBy(new NullByteKeySelector<T>()).transform( "TimeoutCEPPatternOperator", eitherTypeInformation, - new TimeoutCEPPatternOperator<>( + new TimeoutKeyedCEPPatternOperator<>( inputSerializer, isProcessingTime, + keySelector, + keySerializer, nfaFactory )).forceNonParallel(); } return patternStream; } + + /** + * Used as dummy KeySelector to allow using WindowOperator for Non-Keyed Windows. — End diff – Does not seem to fit here. Copy & paste artifact?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3375#discussion_r102462322

          — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java —
          @@ -127,16 +134,36 @@
          keySerializer,
          nfaFactory));
          } else

          { - patternStream = inputStream.transform( + + KeySelector<T, Byte> keySelector = new NullByteKeySelector<>(); + TypeSerializer<Byte> keySerializer = ByteSerializer.INSTANCE; + + patternStream = inputStream.keyBy(new NullByteKeySelector<T>()).transform( "TimeoutCEPPatternOperator", eitherTypeInformation, - new TimeoutCEPPatternOperator<>( + new TimeoutKeyedCEPPatternOperator<>( inputSerializer, isProcessingTime, + keySelector, + keySerializer, nfaFactory )).forceNonParallel(); }

          return patternStream;
          }
          +
          + /**
          + * Used as dummy KeySelector to allow using WindowOperator for Non-Keyed Windows.
          + * @param <T>
          + */
          + protected static class NullByteKeySelector<T> implements KeySelector<T, Byte> {
          — End diff –

          Duplicate code. Already exists in `AllWindowedStream`. Better to refactor the existing code to make it usable here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3375#discussion_r102462322 — Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java — @@ -127,16 +134,36 @@ keySerializer, nfaFactory)); } else { - patternStream = inputStream.transform( + + KeySelector<T, Byte> keySelector = new NullByteKeySelector<>(); + TypeSerializer<Byte> keySerializer = ByteSerializer.INSTANCE; + + patternStream = inputStream.keyBy(new NullByteKeySelector<T>()).transform( "TimeoutCEPPatternOperator", eitherTypeInformation, - new TimeoutCEPPatternOperator<>( + new TimeoutKeyedCEPPatternOperator<>( inputSerializer, isProcessingTime, + keySelector, + keySerializer, nfaFactory )).forceNonParallel(); } return patternStream; } + + /** + * Used as dummy KeySelector to allow using WindowOperator for Non-Keyed Windows. + * @param <T> + */ + protected static class NullByteKeySelector<T> implements KeySelector<T, Byte> { — End diff – Duplicate code. Already exists in `AllWindowedStream`. Better to refactor the existing code to make it usable here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/3375

          R @tillrohrmann

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3375 R @tillrohrmann
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user kl0u opened a pull request:

          https://github.com/apache/flink/pull/3375

          FLINK-5845 [cep] Unify keyed and non-keyed operators.

          This PR is the first step towards making the CEP library rescalable and backwards compatible.

          It just merges the keyed and non-keyed operators into a single, keyed one. A more detailed
          description can be found in the related JIRA.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/kl0u/flink cep-unification

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3375.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3375


          commit 5fec00f79e9deb14c136b55489e96555d9194d1c
          Author: kl0u <kkloudas@gmail.com>
          Date: 2017-02-16T11:02:25Z

          FLINK-5845 [cep] Unify keyed and non-keyed operators.

          Now all cep operators are keyed, and for the non-keyed
          usecases, we key on a dummy key and use the keyed operator.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3375 FLINK-5845 [cep] Unify keyed and non-keyed operators. This PR is the first step towards making the CEP library rescalable and backwards compatible. It just merges the keyed and non-keyed operators into a single, keyed one. A more detailed description can be found in the related JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink cep-unification Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3375.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3375 commit 5fec00f79e9deb14c136b55489e96555d9194d1c Author: kl0u <kkloudas@gmail.com> Date: 2017-02-16T11:02:25Z FLINK-5845 [cep] Unify keyed and non-keyed operators. Now all cep operators are keyed, and for the non-keyed usecases, we key on a dummy key and use the keyed operator.

            People

            • Assignee:
              kkl0u Kostas Kloudas
              Reporter:
              kkl0u Kostas Kloudas
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development