Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9268

RockDB errors from WindowOperator

    XMLWordPrintableJSON

Details

    Description

      The job has no sinks, one Kafka source, does a windowing based on session and uses processing time. The job fails with the error given below after running for few hours. The only way to recover from this error is to cancel the job and start a new one.

      Using S3 backend for externalized checkpoints.

      A representative job DAG:

      val streams = sEnv
      .addSource(makeKafkaSource(config))
      .map(makeEvent)
      .keyBy(_.get(EVENT_GROUP_ID))
      .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
      .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
      .apply(makeEventsList)
      .addSink(makeNoOpSink)

      A representative config:

      state.backend=rocksDB

      checkpoint.enabled=true
      external.checkpoint.enabled=true
      checkpoint.mode=AT_LEAST_ONCE
      checkpoint.interval=900000
      checkpoint.timeout=300000

      Error:

      TimerException{java.lang.NegativeArraySizeException}
      at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.NegativeArraySizeException
      at org.rocksdb.RocksDB.get(Native Method)
      at org.rocksdb.RocksDB.get(RocksDB.java:810)
      at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
      at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
      at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
      at org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
      at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              narayaruna Narayanan Arunachalam
              Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: