Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4341

Kinesis connector does not emit maximum watermark properly

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 1.1.0, 1.1.1
    • 1.1.2, 1.2.0
    • Connectors / Common
    • None

    Description

      *Prevously reported as "Checkpoint state size grows unbounded when task parallelism not uniform"*

      This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I was previously using a 1.1.0 snapshot (commit 18995c8) which performed as expected. This issue was introduced somewhere between those commits.

      I've got a Flink application that uses the Kinesis Stream Consumer to read from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots each, providing a total of 4 slots. When running the application with a parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) and 4 slots for subsequent tasks that process the Kinesis stream data. I use an in-memory store for checkpoint data.

      Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint states were growing unbounded when running with a parallelism of 4, checkpoint interval of 10 seconds:

      ID  State Size
      1   11.3 MB
      2    20.9 MB
      3   30.6 MB
      4   41.4 MB
      5   52.6 MB
      6   62.5 MB
      7   71.5 MB
      8   83.3 MB
      9   93.5 MB
      

      The first 4 checkpoints generally succeed, but then fail with an exception like the following:

      java.lang.RuntimeException: Error triggering a checkpoint as the result of receiving checkpoint barrier at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758)
        at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
        at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
      Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=12105407 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
        at org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146)
        at org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200)
        at org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190)
        at org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762)
        ... 8 more
      

      Or:

      2016-08-09 17:44:43,626 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Restoring checkpointed state to task Fold: property_id, player -> 10-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
      2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter            - Transient association error (association remains live) akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@10.55.2.212:6123/user/jobmanager#510517238]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was 10891825 bytes.
      

      This can be fixed by simply submitting the job with a parallelism of 2. I suspect there was a regression introduced relating to assumptions about the number of sub-tasks associated with a job stage (e.g. assuming 4 instead of a value ranging from 1-4). This is currently preventing me from using all available Task Manager slots.

      Attachments

        Issue Links

          Activity

            People

              tzulitai Tzu-Li (Gordon) Tai
              skidder Scott Kidder
              Votes:
              0 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: