Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Information Provided
-
1.4.2
-
None
Description
The job has no sinks, one Kafka source, does a windowing based on session and uses processing time. The job fails with the error given below after running for few hours. The only way to recover from this error is to cancel the job and start a new one.
Using S3 backend for externalized checkpoints.
A representative job DAG:
val streams = sEnv
.addSource(makeKafkaSource(config))
.map(makeEvent)
.keyBy(_.get(EVENT_GROUP_ID))
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
.trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
.apply(makeEventsList)
.addSink(makeNoOpSink)
A representative config:
state.backend=rocksDB
checkpoint.enabled=true
external.checkpoint.enabled=true
checkpoint.mode=AT_LEAST_ONCE
checkpoint.interval=900000
checkpoint.timeout=300000
Error:
TimerException{java.lang.NegativeArraySizeException}
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NegativeArraySizeException
at org.rocksdb.RocksDB.get(Native Method)
at org.rocksdb.RocksDB.get(RocksDB.java:810)
at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
Attachments
Issue Links
- Blocked
-
FLINK-9291 Checkpoint failure (CIRCULAR REFERENCE:java.lang.NegativeArraySizeException)
- Closed