Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6844

TraversableSerializer should implement compatibility methods

    Details

      Description

      The TraversableSerializer may be used as a serializer for managed state and takes part in checkpointing, therefore should implement the compatibility methods.

        Issue Links

          Activity

          Hide
          yuzhihong@gmail.com Ted Yu added a comment -
          Show
          yuzhihong@gmail.com Ted Yu added a comment - This was first reported by Mahesh: http://search-hadoop.com/m/Flink/VkLeQDkhbf1oulbz1?subj=Flink+1+3+Checkpointing+failing
          Hide
          mingleizhang mingleizhang added a comment -

          I will look into this issue those days.

          Show
          mingleizhang mingleizhang added a comment - I will look into this issue those days.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          mingleizhang I actually already have a fix ready for this, the PR is coming up.
          Would be also be helpful if you'd like to review that!

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - mingleizhang I actually already have a fix ready for this, the PR is coming up. Would be also be helpful if you'd like to review that!
          Hide
          mingleizhang mingleizhang added a comment -

          Tzu-Li (Gordon) Tai Yes. I will review that when the patch is available.

          Show
          mingleizhang mingleizhang added a comment - Tzu-Li (Gordon) Tai Yes. I will review that when the patch is available.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/4081

          FLINK-6844 [scala] Implement compatibility methods for TraversableSerializer

          The `TraversableSerializer` is used for Scala collections, and therefore may take part in checkpointing and should have the compatibility methods implemented.

          I'm also currently trying out whether or not it makes sense / is easily possible to have some test base that guards against these kind of issues. Otherwise, having one per-serializer might not make sense.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-6844

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4081.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4081


          commit 3fbf693610520a983f68a0091057a0af44108834
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-06-05T18:52:57Z

          FLINK-6844 [scala] Implement compatibility methods for TraversableSerializer


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4081 FLINK-6844 [scala] Implement compatibility methods for TraversableSerializer The `TraversableSerializer` is used for Scala collections, and therefore may take part in checkpointing and should have the compatibility methods implemented. I'm also currently trying out whether or not it makes sense / is easily possible to have some test base that guards against these kind of issues. Otherwise, having one per-serializer might not make sense. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-6844 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4081.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4081 commit 3fbf693610520a983f68a0091057a0af44108834 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-06-05T18:52:57Z FLINK-6844 [scala] Implement compatibility methods for TraversableSerializer
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4081

          Tested this with a stateful job that uses Scala collections as state.
          Merging this ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4081 Tested this with a stateful job that uses Scala collections as state. Merging this ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4081

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4081
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Fixed for 1.3 via e1e207c898ed436df656d01364cf0e5fa818b730.
          Fixed for master via c11d5ed5388a5a30ca4ea0c5ac68e22e5989cb54.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Fixed for 1.3 via e1e207c898ed436df656d01364cf0e5fa818b730. Fixed for master via c11d5ed5388a5a30ca4ea0c5ac68e22e5989cb54.
          Hide
          shashank734 Shashank Agarwal added a comment -

          Tzu-Li (Gordon) Tai

          Checked with patch not working with KeyedCEPPatternOperator. In commit you are not throwing exception so only log is printing no exception. But checkpointing is not working it was working fine in 1.2.1

          2017-06-20 15:26:25,518 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom File Source (1/1) is not being executed at the moment. Aborting checkpoint.
          
          Show
          shashank734 Shashank Agarwal added a comment - Tzu-Li (Gordon) Tai Checked with patch not working with KeyedCEPPatternOperator. In commit you are not throwing exception so only log is printing no exception. But checkpointing is not working it was working fine in 1.2.1 2017-06-20 15:26:25,518 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint triggering task Source: Custom File Source (1/1) is not being executed at the moment. Aborting checkpoint.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Shashank Agarwal do you mean that applying the patch still doesn't work? Could you paste the actual checkpoint failure error trace? The one you posted doesn't seem to reveal anything.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Shashank Agarwal do you mean that applying the patch still doesn't work? Could you paste the actual checkpoint failure error trace? The one you posted doesn't seem to reveal anything.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          btw, please test this on the latest release-1.3. There were some further follow up fixes after the commits mentioned above in the comments.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - btw, please test this on the latest release-1.3 . There were some further follow up fixes after the commits mentioned above in the comments.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          For example, there were some other bugs specific to the serializers in the KeyedCEPPatternOperator. I don't think simply applying the mentioned commits would resolve the problem with CEP checkpointing completely.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - For example, there were some other bugs specific to the serializers in the KeyedCEPPatternOperator . I don't think simply applying the mentioned commits would resolve the problem with CEP checkpointing completely.
          Hide
          shashank734 Shashank Agarwal added a comment -

          Tzu-Li (Gordon) Tai

          I have applied this patch on https://github.com/apache/flink/tree/release-1.3.0. Latest 1.3.0 Release. I need stable version.

          Show
          shashank734 Shashank Agarwal added a comment - Tzu-Li (Gordon) Tai I have applied this patch on https://github.com/apache/flink/tree/release-1.3.0 . Latest 1.3.0 Release. I need stable version.
          Hide
          shashank734 Shashank Agarwal added a comment -

          There is no more stack traces it's printing after applying the patch May be it's due to you have removed

          throw new UnsupportedOperationException()
          
          Show
          shashank734 Shashank Agarwal added a comment - There is no more stack traces it's printing after applying the patch May be it's due to you have removed throw new UnsupportedOperationException()
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Shashank Agarwal

          Can you try https://github.com/apache/flink/tree/release-1.3? The release process for the next stable version 1.3.1 is actually in-progress, and will be forked from that branch very soon.

          Alternatively, you can also show me the exact error trace, so I can see if it's actually fixed in other commits already.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Shashank Agarwal Can you try https://github.com/apache/flink/tree/release-1.3? The release process for the next stable version 1.3.1 is actually in-progress, and will be forked from that branch very soon. Alternatively, you can also show me the exact error trace, so I can see if it's actually fixed in other commits already.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Then I'm pretty sure that you actually haven't applied the patch, actually. The TraversableSerializer should not be throwing UnsupportedOperationException anymore, as you can see in the patch.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Then I'm pretty sure that you actually haven't applied the patch, actually. The TraversableSerializer should not be throwing UnsupportedOperationException anymore, as you can see in the patch.
          Hide
          shashank734 Shashank Agarwal added a comment -

          I have cross variefied with source code and flink dashboard also i have successfully applied the patch. Actually it's not printing any other error stack traces.

          Show
          shashank734 Shashank Agarwal added a comment - I have cross variefied with source code and flink dashboard also i have successfully applied the patch. Actually it's not printing any other error stack traces.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          The thing is, I can't tell anything from:

          2017-06-20 15:26:25,518 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom File Source (1/1) is not being executed at the moment. Aborting checkpoint.
          

          It's also not related to the CEP operator.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - The thing is, I can't tell anything from: 2017-06-20 15:26:25,518 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint triggering task Source: Custom File Source (1/1) is not being executed at the moment. Aborting checkpoint. It's also not related to the CEP operator.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Yes, so what I mean is, the checkpointing problem you pasted does not infer problems with this JIRA.
          What source are you using? Could you find out why it isn't being executed while the job was running? That's the actual root cause of the checkpoint failure.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Yes, so what I mean is, the checkpointing problem you pasted does not infer problems with this JIRA. What source are you using? Could you find out why it isn't being executed while the job was running? That's the actual root cause of the checkpoint failure.
          Hide
          shashank734 Shashank Agarwal added a comment -

          Tzu-Li (Gordon) Tai

          I am using kafka as source and there's no issue with that. Actually what I am using flink cep this code was working fine with 1.2.0 and 1.2.1 i have updated my applications to 1.3.0

          Applications are working where i haven't used CEP, In application i have used cep was giving following exception and terminating the checkpointing for all.

          java.lang.Exception: Could not perform checkpoint 1 for operator KeyedCEPPatternOperator -> Map (6/6).
          	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:550)
          	at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
          	at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
          	at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
          	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
          	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
          	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
          	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
          	at java.lang.Thread.run(Thread.java:745)
          Caused by: java.lang.Exception: Could not complete snapshot 1 for operator KeyedCEPPatternOperator -> Map (6/6).
          	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406)
          	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
          	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
          	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
          	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
          	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
          	... 8 more
          Caused by: java.lang.UnsupportedOperationException
          	at org.apache.flink.api.scala.typeutils.TraversableSerializer.snapshotConfiguration(TraversableSerializer.scala:155)
          	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
          	at org.apache.flink.api.scala.typeutils.OptionSerializer$OptionSerializerConfigSnapshot.<init>(OptionSerializer.scala:139)
          	at org.apache.flink.api.scala.typeutils.OptionSerializer.snapshotConfiguration(OptionSerializer.scala:104)
          	at org.apache.flink.api.scala.typeutils.OptionSerializer.snapshotConfiguration(OptionSerializer.scala:28)
          	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
          	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot.<init>(TupleSerializerConfigSnapshot.java:45)
          	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.snapshotConfiguration(TupleSerializerBase.java:132)
          	at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.snapshotConfiguration(TupleSerializerBase.java:39)
          	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
          	at org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot.<init>(CollectionSerializerConfigSnapshot.java:39)
          	at org.apache.flink.api.common.typeutils.base.ListSerializer.snapshotConfiguration(ListSerializer.java:183)
          	at org.apache.flink.api.common.typeutils.base.ListSerializer.snapshotConfiguration(ListSerializer.java:47)
          	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
          	at org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot.<init>(MapSerializerConfigSnapshot.java:38)
          	at org.apache.flink.runtime.state.HashMapSerializer.snapshotConfiguration(HashMapSerializer.java:210)
          	at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71)
          	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:267)
          	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:396)
          	... 13 more
          

          Than i have applied your patch on release-1.3.0 tag and used that with this code still without CEP app is working fine, but CEP app is not printing any error logs and no checkpointing happening. It's showing

          Triggered: 1In Progress: 0
          ID: 1Failure Time: 18:48:41Cause: Checkpoint was declined (tasks not ready)
          

          after restarting the job even it's not triggering the checkpointing and printing the logs i have mentioned above. For replicate i think just use CEP with fsstandbackend.

          Show
          shashank734 Shashank Agarwal added a comment - Tzu-Li (Gordon) Tai I am using kafka as source and there's no issue with that. Actually what I am using flink cep this code was working fine with 1.2.0 and 1.2.1 i have updated my applications to 1.3.0 Applications are working where i haven't used CEP, In application i have used cep was giving following exception and terminating the checkpointing for all. java.lang.Exception: Could not perform checkpoint 1 for operator KeyedCEPPatternOperator -> Map (6/6). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:550) at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang. Thread .run( Thread .java:745) Caused by: java.lang.Exception: Could not complete snapshot 1 for operator KeyedCEPPatternOperator -> Map (6/6). at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542) ... 8 more Caused by: java.lang.UnsupportedOperationException at org.apache.flink.api.scala.typeutils.TraversableSerializer.snapshotConfiguration(TraversableSerializer.scala:155) at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53) at org.apache.flink.api.scala.typeutils.OptionSerializer$OptionSerializerConfigSnapshot.<init>(OptionSerializer.scala:139) at org.apache.flink.api.scala.typeutils.OptionSerializer.snapshotConfiguration(OptionSerializer.scala:104) at org.apache.flink.api.scala.typeutils.OptionSerializer.snapshotConfiguration(OptionSerializer.scala:28) at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot.<init>(TupleSerializerConfigSnapshot.java:45) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.snapshotConfiguration(TupleSerializerBase.java:132) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.snapshotConfiguration(TupleSerializerBase.java:39) at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53) at org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot.<init>(CollectionSerializerConfigSnapshot.java:39) at org.apache.flink.api.common.typeutils.base.ListSerializer.snapshotConfiguration(ListSerializer.java:183) at org.apache.flink.api.common.typeutils.base.ListSerializer.snapshotConfiguration(ListSerializer.java:47) at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53) at org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot.<init>(MapSerializerConfigSnapshot.java:38) at org.apache.flink.runtime.state.HashMapSerializer.snapshotConfiguration(HashMapSerializer.java:210) at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:267) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:396) ... 13 more Than i have applied your patch on release-1.3.0 tag and used that with this code still without CEP app is working fine, but CEP app is not printing any error logs and no checkpointing happening. It's showing Triggered: 1In Progress: 0 ID: 1Failure Time: 18:48:41Cause: Checkpoint was declined (tasks not ready) after restarting the job even it's not triggering the checkpointing and printing the logs i have mentioned above. For replicate i think just use CEP with fsstandbackend.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Didn't you mention this error:

          2017-06-20 15:26:25,518 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom File Source (1/1) is not being executed at the moment. Aborting checkpoint.
          

          ?

          This doesn't seem to be the Kafka source. The checkpoint is failing because the source isn't running.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Didn't you mention this error: 2017-06-20 15:26:25,518 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint triggering task Source: Custom File Source (1/1) is not being executed at the moment. Aborting checkpoint. ? This doesn't seem to be the Kafka source. The checkpoint is failing because the source isn't running.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Shashank Agarwal

          I've tested CEP + with Scala collections as the event type (which would then let the CEP operator use the TraversableSerializer internally), with the following code:

          object FlinkCEPTest {
            def main(args: Array[String]) {
              // set up the streaming execution environment
              val env = StreamExecutionEnvironment.getExecutionEnvironment
              env.enableCheckpointing(5000)
          
              val source: DataStream[scala.collection.immutable.List[java.lang.String]] = env.socketTextStream("localhost", 9999).map(x => List(x.split(",")(0)))
              val pattern = Pattern.begin("start").where(new SimpleCondition[List[String]] {
                override def filter(t: List[String]) = t.head.equals("a")
              }).times(4).allowCombinations().followedBy("end").where(new SimpleCondition[List[String]] {
                override def filter(t: List[String]) = t.head.equals("b")
              })
          
              CEP.pattern(source, pattern).select(_.toString()).print()
          
              // execute program
              env.execute("Flink CEP test")
            }
          }
          

          I can confirm that this works correctly without any errors in branch release-1.3. Checkpoint + restoring from savepoints works correctly.
          As I've mentioned, simply applying the commit for this JIRA onto release-1.3.0 may not work, as the whole fix includes other commits as well.

          Please let me know if you think otherwise!

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Shashank Agarwal I've tested CEP + with Scala collections as the event type (which would then let the CEP operator use the TraversableSerializer internally), with the following code: object FlinkCEPTest { def main(args: Array[ String ]) { // set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(5000) val source: DataStream[scala.collection.immutable.List[java.lang. String ]] = env.socketTextStream( "localhost" , 9999).map(x => List(x.split( "," )(0))) val pattern = Pattern.begin( "start" ).where( new SimpleCondition[List[ String ]] { override def filter(t: List[ String ]) = t.head.equals( "a" ) }).times(4).allowCombinations().followedBy( "end" ).where( new SimpleCondition[List[ String ]] { override def filter(t: List[ String ]) = t.head.equals( "b" ) }) CEP.pattern(source, pattern).select(_.toString()).print() // execute program env.execute( "Flink CEP test" ) } } I can confirm that this works correctly without any errors in branch release-1.3 . Checkpoint + restoring from savepoints works correctly. As I've mentioned, simply applying the commit for this JIRA onto release-1.3.0 may not work, as the whole fix includes other commits as well. Please let me know if you think otherwise!
          Hide
          shashank734 Shashank Agarwal added a comment -

          So what is the release date for version 1.3.1 cause i don't wanna build this with master branch and than have to publish libraries like CEP etc. local. Everything will change. is there any quick fix which i can apply on release-1.3.0 with this commit.

          Show
          shashank734 Shashank Agarwal added a comment - So what is the release date for version 1.3.1 cause i don't wanna build this with master branch and than have to publish libraries like CEP etc. local. Everything will change. is there any quick fix which i can apply on release-1.3.0 with this commit.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Shashank Agarwal I think the next release candidate for 1.3.1 will be pushed out today. The vote for it will happen on the mailing list, so hopefully you should be able to expect it to be released before next week.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Shashank Agarwal I think the next release candidate for 1.3.1 will be pushed out today. The vote for it will happen on the mailing list, so hopefully you should be able to expect it to be released before next week.

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development