Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Component/s: CEP
    • Labels:
      None

      Description

      This targets making the new CEP operators compatible with their previous versions from Flink 1.1 and Flink 1.2.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user kl0u opened a pull request:

          https://github.com/apache/flink/pull/3445

          FLINK-5846 [cep] Make the CEP operators backwards compatible

          This also contains a commit for https://issues.apache.org/jira/browse/FLINK-5932

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/kl0u/flink cep-unified-bckwrds

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3445.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3445


          commit 311fb69efeb4e7f82a0c166ed3bbd20920dcc6e4
          Author: kl0u <kkloudas@gmail.com>
          Date: 2017-02-28T21:31:01Z

          FLINK-5932 initializeState() before legacy state restoring in operator

          commit d4ada9bcb96d269a591fefd35cadbf934f84edd7
          Author: kl0u <kkloudas@gmail.com>
          Date: 2017-02-20T13:51:51Z

          FLINK-5846 [cep] Make the CEP operators backwards compatible (Flink 1.1 and 1.2)


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3445 FLINK-5846 [cep] Make the CEP operators backwards compatible This also contains a commit for https://issues.apache.org/jira/browse/FLINK-5932 You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink cep-unified-bckwrds Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3445.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3445 commit 311fb69efeb4e7f82a0c166ed3bbd20920dcc6e4 Author: kl0u <kkloudas@gmail.com> Date: 2017-02-28T21:31:01Z FLINK-5932 initializeState() before legacy state restoring in operator commit d4ada9bcb96d269a591fefd35cadbf934f84edd7 Author: kl0u <kkloudas@gmail.com> Date: 2017-02-20T13:51:51Z FLINK-5846 [cep] Make the CEP operators backwards compatible (Flink 1.1 and 1.2)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3445

          Please add the savepoints to the rat-plugin exclusions, otherwise the build will fail on windows.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3445 Please add the savepoints to the rat-plugin exclusions, otherwise the build will fail on windows.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/3445

          Thanks for the comment @zentol !

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3445 Thanks for the comment @zentol !
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3445#discussion_r104413190

          — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java —
          @@ -0,0 +1,243 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.cep.operator;
          +
          +import org.apache.flink.api.common.functions.FilterFunction;
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeutils.base.ByteSerializer;
          +import org.apache.flink.api.common.typeutils.base.IntSerializer;
          +import org.apache.flink.api.java.functions.KeySelector;
          +import org.apache.flink.cep.Event;
          +import org.apache.flink.cep.SubEvent;
          +import org.apache.flink.cep.nfa.NFA;
          +import org.apache.flink.cep.nfa.compiler.NFACompiler;
          +import org.apache.flink.cep.pattern.Pattern;
          +import org.apache.flink.streaming.api.watermark.Watermark;
          +import org.apache.flink.streaming.api.windowing.time.Time;
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
          +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
          +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
          +import org.junit.Test;
          +
          +import java.net.URL;
          +import java.util.Map;
          +import java.util.concurrent.ConcurrentLinkedQueue;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertTrue;
          +
          +public class CEPMigrationTest {
          +
          + private static String getResourceFilename(String filename) {
          + ClassLoader cl = CEPMigrationTest.class.getClassLoader();
          + URL resource = cl.getResource(filename);
          + if (resource == null)

          { + throw new NullPointerException("Missing snapshot resource."); + }

          + return resource.getFile();
          + }
          +
          + @Test
          + public void testKeyedCEPFunctionMigration() throws Exception {
          — End diff –

          I think this should be called `testKeyedCEPFunctionMigrationFrom12`, and we need a separate test for migrating from 1.1. (You probably forgot, since I already can see the commented out code and the savepoint you created for this.)

          Same for the other test.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3445#discussion_r104413190 — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java — @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.ByteSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.SubEvent; +import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.junit.Test; + +import java.net.URL; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class CEPMigrationTest { + + private static String getResourceFilename(String filename) { + ClassLoader cl = CEPMigrationTest.class.getClassLoader(); + URL resource = cl.getResource(filename); + if (resource == null) { + throw new NullPointerException("Missing snapshot resource."); + } + return resource.getFile(); + } + + @Test + public void testKeyedCEPFunctionMigration() throws Exception { — End diff – I think this should be called `testKeyedCEPFunctionMigrationFrom12`, and we need a separate test for migrating from 1.1. (You probably forgot, since I already can see the commented out code and the savepoint you created for this.) Same for the other test.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3445#discussion_r104413873

          — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java —
          @@ -73,7 +73,8 @@ public Integer getKey(Event value) throws Exception {
          false,
          keySelector,
          IntSerializer.INSTANCE,

          • new NFAFactory()),
            + new NFAFactory(),
            + true),
              • End diff –

          Ah, I see now what the parameter does. Even if you don't migrate you still set it to true. The parameter basically indicates "would have been a keyed operator whether we actually have legacy state or not". That's why it's always false when restoring from the old non-keyed operator.

          We just use it in the `restoreState()` method so when we don't have legacy state the value is never actually checked.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3445#discussion_r104413873 — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java — @@ -73,7 +73,8 @@ public Integer getKey(Event value) throws Exception { false, keySelector, IntSerializer.INSTANCE, new NFAFactory()), + new NFAFactory(), + true), End diff – Ah, I see now what the parameter does. Even if you don't migrate you still set it to true. The parameter basically indicates "would have been a keyed operator whether we actually have legacy state or not". That's why it's always false when restoring from the old non-keyed operator. We just use it in the `restoreState()` method so when we don't have legacy state the value is never actually checked.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3445#discussion_r104412655

          — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java —
          @@ -345,7 +345,8 @@ private void verifyPattern(Object outputObject, Event start, SubEvent middle, Ev
          false,
          keySelector,
          BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()),

          • new NFAFactory()),
            + new NFAFactory(),
            + true),
              • End diff –

          Should this be `false`? The parameter name is `migratingFromOldKeyedOperator`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3445#discussion_r104412655 — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java — @@ -345,7 +345,8 @@ private void verifyPattern(Object outputObject, Event start, SubEvent middle, Ev false, keySelector, BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()), new NFAFactory()), + new NFAFactory(), + true), End diff – Should this be `false`? The parameter name is `migratingFromOldKeyedOperator`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3445#discussion_r104412825

          — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java —
          @@ -73,7 +73,8 @@ public Integer getKey(Event value) throws Exception {
          false,
          keySelector,
          IntSerializer.INSTANCE,

          • new NFAFactory()),
            + new NFAFactory(),
            + true),
              • End diff –

          Should this be `false`? The parameter name is `migratingFromOldKeyedOperator`.

          Same for the other instances in this file.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3445#discussion_r104412825 — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java — @@ -73,7 +73,8 @@ public Integer getKey(Event value) throws Exception { false, keySelector, IntSerializer.INSTANCE, new NFAFactory()), + new NFAFactory(), + true), End diff – Should this be `false`? The parameter name is `migratingFromOldKeyedOperator`. Same for the other instances in this file.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3445#discussion_r104413339

          — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java —
          @@ -0,0 +1,243 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.cep.operator;
          +
          +import org.apache.flink.api.common.functions.FilterFunction;
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeutils.base.ByteSerializer;
          +import org.apache.flink.api.common.typeutils.base.IntSerializer;
          +import org.apache.flink.api.java.functions.KeySelector;
          +import org.apache.flink.cep.Event;
          +import org.apache.flink.cep.SubEvent;
          +import org.apache.flink.cep.nfa.NFA;
          +import org.apache.flink.cep.nfa.compiler.NFACompiler;
          +import org.apache.flink.cep.pattern.Pattern;
          +import org.apache.flink.streaming.api.watermark.Watermark;
          +import org.apache.flink.streaming.api.windowing.time.Time;
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
          +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
          +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
          +import org.junit.Test;
          +
          +import java.net.URL;
          +import java.util.Map;
          +import java.util.concurrent.ConcurrentLinkedQueue;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertTrue;
          +
          +public class CEPMigrationTest {
          +
          + private static String getResourceFilename(String filename) {
          + ClassLoader cl = CEPMigrationTest.class.getClassLoader();
          + URL resource = cl.getResource(filename);
          + if (resource == null)

          { + throw new NullPointerException("Missing snapshot resource."); + }

          + return resource.getFile();
          + }
          +
          + @Test
          + public void testKeyedCEPFunctionMigration() throws Exception {
          — End diff –

          Could you also please put the code for generating the snapshot in here, but commented out? Similar to how we did in in `WindowOperatorMigrationTest`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3445#discussion_r104413339 — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java — @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.ByteSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.SubEvent; +import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.junit.Test; + +import java.net.URL; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class CEPMigrationTest { + + private static String getResourceFilename(String filename) { + ClassLoader cl = CEPMigrationTest.class.getClassLoader(); + URL resource = cl.getResource(filename); + if (resource == null) { + throw new NullPointerException("Missing snapshot resource."); + } + return resource.getFile(); + } + + @Test + public void testKeyedCEPFunctionMigration() throws Exception { — End diff – Could you also please put the code for generating the snapshot in here, but commented out? Similar to how we did in in `WindowOperatorMigrationTest`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/3445

          @aljoscha , I integrated your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3445 @aljoscha , I integrated your comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3445#discussion_r105682110

          — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java —
          @@ -75,9 +94,15 @@ public void testKeyedCEPFunctionMigration() throws Exception {
          BasicTypeInfo.INT_TYPE_INFO);

          harness.setup();

          • harness.initializeStateFromLegacyCheckpoint(getResourceFilename(
          • "cep-keyed-savepoint-1.2"));
            -// "cep-keyed-savepoint-1.1"));
            +
            + if (from11) {
              • End diff –

          I think we should have separate test classes for "1.1 to 1.3" and "1.2 to 1.3".

          I just though about this in the context of https://issues.apache.org/jira/browse/FLINK-5969 where I'm also planning to do it like this. Sorry for bringing this up so late but I think in the end having a bunch of ifs does not scale well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3445#discussion_r105682110 — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java — @@ -75,9 +94,15 @@ public void testKeyedCEPFunctionMigration() throws Exception { BasicTypeInfo.INT_TYPE_INFO); harness.setup(); harness.initializeStateFromLegacyCheckpoint(getResourceFilename( "cep-keyed-savepoint-1.2")); -// "cep-keyed-savepoint-1.1")); + + if (from11) { End diff – I think we should have separate test classes for "1.1 to 1.3" and "1.2 to 1.3". I just though about this in the context of https://issues.apache.org/jira/browse/FLINK-5969 where I'm also planning to do it like this. Sorry for bringing this up so late but I think in the end having a bunch of ifs does not scale well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3445#discussion_r105683186

          — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java —
          @@ -240,4 +270,159 @@ public Byte getKey(Event value) throws Exception

          { return 0; }

          }
          -}
          +
          — End diff –

          Is this the complete code that can be used to regenerate the snapshot files?

          I think `WindowOperatorMigrationTest.testRestoreSessionWindowsWithCountTriggerFromFlink11()` shows how the code that generates the snapshot and the testing code can be interleaved nicely. This format also makes it easy (hopefully, as I will soon find out because I'm re-writing the tests for Flink 1.3 (https://issues.apache.org/jira/browse/FLINK-5969)) to update/adapt the tests on version updates of Flink.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3445#discussion_r105683186 — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java — @@ -240,4 +270,159 @@ public Byte getKey(Event value) throws Exception { return 0; } } -} + — End diff – Is this the complete code that can be used to regenerate the snapshot files? I think `WindowOperatorMigrationTest.testRestoreSessionWindowsWithCountTriggerFromFlink11()` shows how the code that generates the snapshot and the testing code can be interleaved nicely. This format also makes it easy (hopefully, as I will soon find out because I'm re-writing the tests for Flink 1.3 ( https://issues.apache.org/jira/browse/FLINK-5969 )) to update/adapt the tests on version updates of Flink.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3445#discussion_r105707379

          — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java —
          @@ -75,9 +94,15 @@ public void testKeyedCEPFunctionMigration() throws Exception {
          BasicTypeInfo.INT_TYPE_INFO);

          harness.setup();

          • harness.initializeStateFromLegacyCheckpoint(getResourceFilename(
          • "cep-keyed-savepoint-1.2"));
            -// "cep-keyed-savepoint-1.1"));
            +
            + if (from11) {
              • End diff –

          No problem! I will do it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3445#discussion_r105707379 — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java — @@ -75,9 +94,15 @@ public void testKeyedCEPFunctionMigration() throws Exception { BasicTypeInfo.INT_TYPE_INFO); harness.setup(); harness.initializeStateFromLegacyCheckpoint(getResourceFilename( "cep-keyed-savepoint-1.2")); -// "cep-keyed-savepoint-1.1")); + + if (from11) { End diff – No problem! I will do it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/3445

          Thanks for the comments @aljoscha . I integrated them and I updated https://issues.apache.org/jira/browse/FLINK-5969 to keep track.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3445 Thanks for the comments @aljoscha . I integrated them and I updated https://issues.apache.org/jira/browse/FLINK-5969 to keep track.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3445#discussion_r105936184

          — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTestBase.java —
          @@ -0,0 +1,241 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.cep.operator;
          +
          +import org.apache.flink.api.common.functions.FilterFunction;
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeutils.base.ByteSerializer;
          +import org.apache.flink.api.common.typeutils.base.IntSerializer;
          +import org.apache.flink.api.java.functions.KeySelector;
          +import org.apache.flink.cep.Event;
          +import org.apache.flink.cep.SubEvent;
          +import org.apache.flink.cep.nfa.NFA;
          +import org.apache.flink.cep.nfa.compiler.NFACompiler;
          +import org.apache.flink.cep.pattern.Pattern;
          +import org.apache.flink.streaming.api.watermark.Watermark;
          +import org.apache.flink.streaming.api.windowing.time.Time;
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
          +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
          +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
          +
          +import java.net.URL;
          +import java.util.Map;
          +import java.util.concurrent.ConcurrentLinkedQueue;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertTrue;
          +
          +/**
          + * Base class for the migration tests between different versions.
          + * See

          {@link CEPMigration11to13Test}

          for an example of migration test
          + * between Flink-1.1 and Flink-1.3.
          + * */
          — End diff –

          Should have a newline, or at least not two stars on one line 😉

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3445#discussion_r105936184 — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTestBase.java — @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.ByteSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.SubEvent; +import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import java.net.URL; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Base class for the migration tests between different versions. + * See {@link CEPMigration11to13Test} for an example of migration test + * between Flink-1.1 and Flink-1.3. + * */ — End diff – Should have a newline, or at least not two stars on one line 😉
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3445#discussion_r105937053

          — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java —
          @@ -0,0 +1,171 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.cep.operator;
          +
          +import org.junit.Test;
          +
          +public class CEPMigration11to13Test extends CEPMigrationTestBase {
          +
          + @Test
          + public void testKeyedCEPFunctionMigration11() throws Exception

          { + testKeyedCEPFunctionMigration("cep-keyed-savepoint-1.1"); + }

          +
          + @Test
          + public void testNonKeyedCEPFunctionMigration11() throws Exception

          { + testNonKeyedCEPFunctionMigration("cep-non-keyed-savepoint-1.1"); + }

          +}
          +
          +/*
          +FLINK 1.1 CODE TO PRODUCE THE SAVEPOINTS. (INCLUDE ALSO THE PATTERN CODE AT THE BOTTOM FOR BOTH CASES)
          +
          +@Test
          +public void keyedCEPOperatorSavepointGen() throws Exception {
          +
          + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
          + private static final long serialVersionUID = -4873366487571254798L;
          +
          + @Override
          + public Integer getKey(Event value) throws Exception

          { + return value.getId(); + }

          + };
          +
          + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
          + new KeyedCEPPatternOperator<>(
          + Event.createTypeSerializer(),
          + false,
          + keySelector,
          + IntSerializer.INSTANCE,
          + new NFAFactory()));
          + harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
          + harness.open();
          +
          + Event startEvent = new Event(42, "start", 1.0);
          + SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
          + Event endEvent= new Event(42, "end", 1.0);
          +
          + harness.processElement(new StreamRecord<Event>(startEvent, 1));
          + harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
          + harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
          +
          + harness.processWatermark(new Watermark(2));
          +
          + // simulate snapshot/restore with empty element queue but NFA state
          + StreamTaskState snapshot = harness.snapshot(1, 1);
          +
          + FileOutputStream out = new FileOutputStream("/Users/kkloudas/Desktop/cep-keyed-savepoint-1.1");
          — End diff –

          This should be something like `"src/test/resources/cep-keyed-savepoint-1.1"`, same for the other path down below. With this, it would generate directly into the directory where the test currently is and where the test expects to find it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3445#discussion_r105937053 — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java — @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.junit.Test; + +public class CEPMigration11to13Test extends CEPMigrationTestBase { + + @Test + public void testKeyedCEPFunctionMigration11() throws Exception { + testKeyedCEPFunctionMigration("cep-keyed-savepoint-1.1"); + } + + @Test + public void testNonKeyedCEPFunctionMigration11() throws Exception { + testNonKeyedCEPFunctionMigration("cep-non-keyed-savepoint-1.1"); + } +} + +/* +FLINK 1.1 CODE TO PRODUCE THE SAVEPOINTS. (INCLUDE ALSO THE PATTERN CODE AT THE BOTTOM FOR BOTH CASES) + +@Test +public void keyedCEPOperatorSavepointGen() throws Exception { + + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory())); + harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO); + harness.open(); + + Event startEvent = new Event(42, "start", 1.0); + SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); + Event endEvent= new Event(42, "end", 1.0); + + harness.processElement(new StreamRecord<Event>(startEvent, 1)); + harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2)); + harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + + harness.processWatermark(new Watermark(2)); + + // simulate snapshot/restore with empty element queue but NFA state + StreamTaskState snapshot = harness.snapshot(1, 1); + + FileOutputStream out = new FileOutputStream("/Users/kkloudas/Desktop/cep-keyed-savepoint-1.1"); — End diff – This should be something like `"src/test/resources/cep-keyed-savepoint-1.1"`, same for the other path down below. With this, it would generate directly into the directory where the test currently is and where the test expects to find it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/3445

          Hi @aljoscha I integrated your comments and it passed on Travis. Let me know what you think.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3445 Hi @aljoscha I integrated your comments and it passed on Travis. Let me know what you think.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3445#discussion_r106659165

          — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java —
          @@ -0,0 +1,268 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.cep.operator;
          +
          +import org.apache.flink.api.common.functions.FilterFunction;
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeutils.base.ByteSerializer;
          +import org.apache.flink.api.common.typeutils.base.IntSerializer;
          +import org.apache.flink.api.java.functions.KeySelector;
          +import org.apache.flink.api.java.functions.NullByteKeySelector;
          +import org.apache.flink.cep.Event;
          +import org.apache.flink.cep.SubEvent;
          +import org.apache.flink.cep.nfa.NFA;
          +import org.apache.flink.cep.nfa.compiler.NFACompiler;
          +import org.apache.flink.cep.pattern.Pattern;
          +import org.apache.flink.streaming.api.watermark.Watermark;
          +import org.apache.flink.streaming.api.windowing.time.Time;
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
          +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
          +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
          +import org.junit.Test;
          +
          +import java.net.URL;
          +import java.util.Map;
          +import java.util.concurrent.ConcurrentLinkedQueue;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertTrue;
          +
          +public class CEPMigration11to13Test {
          +
          + private static String getResourceFilename(String filename) {
          + ClassLoader cl = CEPMigration11to13Test.class.getClassLoader();
          + URL resource = cl.getResource(filename);
          + if (resource == null)

          { + throw new NullPointerException("Missing snapshot resource."); + }

          + return resource.getFile();
          + }
          +
          + @Test
          + public void testKeyedCEPOperatorMigratation() throws Exception {
          +
          + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
          + private static final long serialVersionUID = -4873366487571254798L;
          +
          + @Override
          + public Integer getKey(Event value) throws Exception

          { + return value.getId(); + }

          + };
          +
          + final Event startEvent = new Event(42, "start", 1.0);
          + final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
          + final Event endEvent = new Event(42, "end", 1.0);
          +
          + // uncomment these lines for regenerating the snapshot on Flink 1.1
          + /*
          + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
          + new KeyedCEPPatternOperator<>(
          + Event.createTypeSerializer(),
          + false,
          + keySelector,
          + IntSerializer.INSTANCE,
          + new NFAFactory()));
          + harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
          + harness.open();
          + harness.processElement(new StreamRecord<Event>(startEvent, 1));
          + harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
          + harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
          + harness.processWatermark(new Watermark(2));
          + // simulate snapshot/restore with empty element queue but NFA state
          + StreamTaskState snapshot = harness.snapshot(1, 1);
          + FileOutputStream out = new FileOutputStream(
          + "src/test/resources/cep-keyed-snapshot-1.1-old");
          + ObjectOutputStream oos = new ObjectOutputStream(out);
          + oos.writeObject(snapshot);
          + out.close();
          + harness.close();
          + */
          +
          + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
          + new KeyedOneInputStreamOperatorTestHarness<>(
          + new KeyedCEPPatternOperator<>(
          + Event.createTypeSerializer(),
          + false,
          + keySelector,
          + IntSerializer.INSTANCE,
          + new NFAFactory(),
          + true),
          + keySelector,
          + BasicTypeInfo.INT_TYPE_INFO);
          +
          + harness.setup();
          + harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-snapshot-1.1"));
          + harness.open();
          +
          + harness.processElement(new StreamRecord<Event>(middleEvent, 3));
          + harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
          + harness.processElement(new StreamRecord<>(endEvent, 5));
          +
          + harness.processWatermark(new Watermark(20));
          +
          + ConcurrentLinkedQueue<Object> result = harness.getOutput();
          +
          + // watermark and the result
          + assertEquals(2, result.size());
          +
          + Object resultObject = result.poll();
          + assertTrue(resultObject instanceof StreamRecord);
          + StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
          + assertTrue(resultRecord.getValue() instanceof Map);
          +
          + @SuppressWarnings("unchecked")
          + Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
          +
          + assertEquals(startEvent, patternMap.get("start"));
          + assertEquals(middleEvent, patternMap.get("middle"));
          + assertEquals(endEvent, patternMap.get("end"));
          +
          + harness.close();
          + }
          +
          + @Test
          + public void testNonKeyedCEPFunctionMigration() throws Exception {
          +
          + final Event startEvent = new Event(42, "start", 1.0);
          + final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
          + final Event endEvent= new Event(42, "end", 1.0);
          +
          + // uncomment these lines for regenerating the snapshot on Flink 1.1
          + /*
          + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
          + new CEPPatternOperator<>(
          + Event.createTypeSerializer(),
          + false,
          + new NFAFactory()));
          + harness.open();
          + harness.processElement(new StreamRecord<Event>(startEvent, 1));
          + harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
          + harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
          + harness.processWatermark(new Watermark(2));
          +
          + // simulate snapshot/restore with empty element queue but NFA state
          + StreamTaskState snapshot = harness.snapshot(1, 1);
          + FileOutputStream out = new FileOutputStream(
          + "src/test/resources/cep-non-keyed-snapshot-1.1-old");
          — End diff –

          The -old seems to be a leftover?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3445#discussion_r106659165 — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java — @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.ByteSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.functions.NullByteKeySelector; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.SubEvent; +import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.junit.Test; + +import java.net.URL; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class CEPMigration11to13Test { + + private static String getResourceFilename(String filename) { + ClassLoader cl = CEPMigration11to13Test.class.getClassLoader(); + URL resource = cl.getResource(filename); + if (resource == null) { + throw new NullPointerException("Missing snapshot resource."); + } + return resource.getFile(); + } + + @Test + public void testKeyedCEPOperatorMigratation() throws Exception { + + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + final Event startEvent = new Event(42, "start", 1.0); + final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); + final Event endEvent = new Event(42, "end", 1.0); + + // uncomment these lines for regenerating the snapshot on Flink 1.1 + /* + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory())); + harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO); + harness.open(); + harness.processElement(new StreamRecord<Event>(startEvent, 1)); + harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2)); + harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + harness.processWatermark(new Watermark(2)); + // simulate snapshot/restore with empty element queue but NFA state + StreamTaskState snapshot = harness.snapshot(1, 1); + FileOutputStream out = new FileOutputStream( + "src/test/resources/cep-keyed-snapshot-1.1-old"); + ObjectOutputStream oos = new ObjectOutputStream(out); + oos.writeObject(snapshot); + out.close(); + harness.close(); + */ + + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory(), + true), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + harness.setup(); + harness.initializeStateFromLegacyCheckpoint(getResourceFilename("cep-keyed-snapshot-1.1")); + harness.open(); + + harness.processElement(new StreamRecord<Event>(middleEvent, 3)); + harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4)); + harness.processElement(new StreamRecord<>(endEvent, 5)); + + harness.processWatermark(new Watermark(20)); + + ConcurrentLinkedQueue<Object> result = harness.getOutput(); + + // watermark and the result + assertEquals(2, result.size()); + + Object resultObject = result.poll(); + assertTrue(resultObject instanceof StreamRecord); + StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject; + assertTrue(resultRecord.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue(); + + assertEquals(startEvent, patternMap.get("start")); + assertEquals(middleEvent, patternMap.get("middle")); + assertEquals(endEvent, patternMap.get("end")); + + harness.close(); + } + + @Test + public void testNonKeyedCEPFunctionMigration() throws Exception { + + final Event startEvent = new Event(42, "start", 1.0); + final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); + final Event endEvent= new Event(42, "end", 1.0); + + // uncomment these lines for regenerating the snapshot on Flink 1.1 + /* + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>( + new CEPPatternOperator<>( + Event.createTypeSerializer(), + false, + new NFAFactory())); + harness.open(); + harness.processElement(new StreamRecord<Event>(startEvent, 1)); + harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2)); + harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + harness.processWatermark(new Watermark(2)); + + // simulate snapshot/restore with empty element queue but NFA state + StreamTaskState snapshot = harness.snapshot(1, 1); + FileOutputStream out = new FileOutputStream( + "src/test/resources/cep-non-keyed-snapshot-1.1-old"); — End diff – The -old seems to be a leftover?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3445#discussion_r106659109

          — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java —
          @@ -0,0 +1,268 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.cep.operator;
          +
          +import org.apache.flink.api.common.functions.FilterFunction;
          +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
          +import org.apache.flink.api.common.typeutils.base.ByteSerializer;
          +import org.apache.flink.api.common.typeutils.base.IntSerializer;
          +import org.apache.flink.api.java.functions.KeySelector;
          +import org.apache.flink.api.java.functions.NullByteKeySelector;
          +import org.apache.flink.cep.Event;
          +import org.apache.flink.cep.SubEvent;
          +import org.apache.flink.cep.nfa.NFA;
          +import org.apache.flink.cep.nfa.compiler.NFACompiler;
          +import org.apache.flink.cep.pattern.Pattern;
          +import org.apache.flink.streaming.api.watermark.Watermark;
          +import org.apache.flink.streaming.api.windowing.time.Time;
          +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
          +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
          +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
          +import org.junit.Test;
          +
          +import java.net.URL;
          +import java.util.Map;
          +import java.util.concurrent.ConcurrentLinkedQueue;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertTrue;
          +
          +public class CEPMigration11to13Test {
          +
          + private static String getResourceFilename(String filename) {
          + ClassLoader cl = CEPMigration11to13Test.class.getClassLoader();
          + URL resource = cl.getResource(filename);
          + if (resource == null)

          { + throw new NullPointerException("Missing snapshot resource."); + }

          + return resource.getFile();
          + }
          +
          + @Test
          + public void testKeyedCEPOperatorMigratation() throws Exception {
          +
          + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
          + private static final long serialVersionUID = -4873366487571254798L;
          +
          + @Override
          + public Integer getKey(Event value) throws Exception

          { + return value.getId(); + }

          + };
          +
          + final Event startEvent = new Event(42, "start", 1.0);
          + final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
          + final Event endEvent = new Event(42, "end", 1.0);
          +
          + // uncomment these lines for regenerating the snapshot on Flink 1.1
          + /*
          + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
          + new KeyedCEPPatternOperator<>(
          + Event.createTypeSerializer(),
          + false,
          + keySelector,
          + IntSerializer.INSTANCE,
          + new NFAFactory()));
          + harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
          + harness.open();
          + harness.processElement(new StreamRecord<Event>(startEvent, 1));
          + harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
          + harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
          + harness.processWatermark(new Watermark(2));
          + // simulate snapshot/restore with empty element queue but NFA state
          + StreamTaskState snapshot = harness.snapshot(1, 1);
          + FileOutputStream out = new FileOutputStream(
          — End diff –

          The `-old` seems to be a leftover?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3445#discussion_r106659109 — Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java — @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.ByteSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.functions.NullByteKeySelector; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.SubEvent; +import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.junit.Test; + +import java.net.URL; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class CEPMigration11to13Test { + + private static String getResourceFilename(String filename) { + ClassLoader cl = CEPMigration11to13Test.class.getClassLoader(); + URL resource = cl.getResource(filename); + if (resource == null) { + throw new NullPointerException("Missing snapshot resource."); + } + return resource.getFile(); + } + + @Test + public void testKeyedCEPOperatorMigratation() throws Exception { + + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + final Event startEvent = new Event(42, "start", 1.0); + final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); + final Event endEvent = new Event(42, "end", 1.0); + + // uncomment these lines for regenerating the snapshot on Flink 1.1 + /* + OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory())); + harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO); + harness.open(); + harness.processElement(new StreamRecord<Event>(startEvent, 1)); + harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2)); + harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + harness.processWatermark(new Watermark(2)); + // simulate snapshot/restore with empty element queue but NFA state + StreamTaskState snapshot = harness.snapshot(1, 1); + FileOutputStream out = new FileOutputStream( — End diff – The `-old` seems to be a leftover?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/3445

          Thanks @aljoscha . I will wait for travis and merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3445 Thanks @aljoscha . I will wait for travis and merge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3445

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3445
          Hide
          kkl0u Kostas Kloudas added a comment -

          Merged with 521a53d9ad68a3f16a32e08843a6fca2bd4e439d

          Show
          kkl0u Kostas Kloudas added a comment - Merged with 521a53d9ad68a3f16a32e08843a6fca2bd4e439d

            People

            • Assignee:
              kkl0u Kostas Kloudas
              Reporter:
              kkl0u Kostas Kloudas
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development