Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9233

Merging state may cause runtime exception when windows trigger onMerge

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Cannot Reproduce
    • 1.4.0
    • None
    • None

    Description

      the main logic of my flink job is as follows:

      clickStream.coGroup(exposureStream).where(...).equalTo(...)
      .window(EventTimeSessionWindows.withGap())
      .trigger(new SessionMatchTrigger)
      .evictor()
      .apply();
      
      SessionMatchTrigger{
      
          ReducingStateDescriptor  stateDesc = new ReducingStateDescriptor()
      ...
          public boolean canMerge() {
              return true;
          }
      
      
          public void onMerge(TimeWindow window, OnMergeContext ctx) {
              ctx.mergePartitionedState(this.stateDesc);
              ctx.registerEventTimeTimer(window.maxTimestamp());
          }
      ....
      }
      
      detailed error logs

      java.lang.RuntimeException: Error while merging state.
      at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.mergePartitionedState(WindowOperator.java:895)
      at com.package.trigger.SessionMatchTrigger.onMerge(SessionMatchTrigger.java:56)
      at com.package.trigger.SessionMatchTrigger.onMerge(SessionMatchTrigger.java:14)
      at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onMerge(WindowOperator.java:939)
      at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator$1.merge(EvictingWindowOperator.java:141)
      at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator$1.merge(EvictingWindowOperator.java:120)
      at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
      at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:119)
      at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
      at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.Exception: Error while merging state in RocksDB
      at org.apache.flink.contrib.streaming.state.RocksDBReducingState.mergeNamespaces(RocksDBReducingState.java:186)
      at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.mergePartitionedState(WindowOperator.java:887)
      ... 12 more
      Caused by: java.lang.IllegalArgumentException: Illegal value provided for SubCode.
      at org.rocksdb.Status$SubCode.getSubCode(Status.java:109)
      at org.rocksdb.Status.<init>(Status.java:30)
      at org.rocksdb.RocksDB.delete(Native Method)
      at org.rocksdb.RocksDB.delete(RocksDB.java:1110)
      at org.apache.flink.contrib.streaming.state.RocksDBReducingState.mergeNamespaces(RocksDBReducingState.java:143)
      ... 13 more

       

      I found the reason of this error.
      Due to Java's

      {RocksDB.Status.SubCode}

      was out of sync with

      {include/rocksdb/status.h:SubCode}

      .
      When running out of disc space this led to an

      {IllegalArgumentException}

      because of an invalid status code, rather than just returning the corresponding status code without an exception.
      more details:<https://github.com/facebook/rocksdb/pull/3050>

      Attachments

        Activity

          People

            Unassigned Unassigned
            yew1eb Hai Zhou
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: