*Prevously reported as "Checkpoint state size grows unbounded when task parallelism not uniform"*
This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I was previously using a 1.1.0 snapshot (commit 18995c8) which performed as expected. This issue was introduced somewhere between those commits.
I've got a Flink application that uses the Kinesis Stream Consumer to read from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots each, providing a total of 4 slots. When running the application with a parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) and 4 slots for subsequent tasks that process the Kinesis stream data. I use an in-memory store for checkpoint data.
Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint states were growing unbounded when running with a parallelism of 4, checkpoint interval of 10 seconds:
ID State Size
1 11.3 MB
2 20.9 MB
3 30.6 MB
4 41.4 MB
5 52.6 MB
6 62.5 MB
7 71.5 MB
8 83.3 MB
9 93.5 MB
The first 4 checkpoints generally succeed, but then fail with an exception like the following:
java.lang.RuntimeException: Error triggering a checkpoint as the result of receiving checkpoint barrier at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768)
Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=12105407 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
... 8 more
2016-08-09 17:44:43,626 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Restoring checkpointed state to task Fold: property_id, player -> 10-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter - Transient association error (association remains live) akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp:
This can be fixed by simply submitting the job with a parallelism of 2. I suspect there was a regression introduced relating to assumptions about the number of sub-tasks associated with a job stage (e.g. assuming 4 instead of a value ranging from 1-4). This is currently preventing me from using all available Task Manager slots.