Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7830 Problematic interaction of CEP and asynchronous snapshots
  3. FLINK-7435

FsStateBackend with incremental backup enable does not work with Keyed CEP

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 1.3.1, 1.3.2
    • 1.4.0
    • Library / CEP
    • None
    • AWS EMR YARN, use CEP with pattern start -> next (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend with Incremental option open.

    Description

      java.lang.RuntimeException: Exception occurred while processing valve output watermark:
      at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
      at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
      at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
      at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
      at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.RuntimeException: Could not copy NFA.
      at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
      at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
      at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
      at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
      at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
      at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
      at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
      at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
      at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
      at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
      at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
      ... 7 more
      Caused by: java.io.StreamCorruptedException: invalid type code: 00
      at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
      at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
      at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
      at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
      at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
      at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
      at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
      at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
      at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
      at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
      at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
      ... 17 more

      Attachments

        Activity

          People

            kkl0u Kostas Kloudas
            lidaiqing daiqing
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: