Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19970

State leak in CEP Operators (expired events/keys not removed from state)

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.11.2
    • Fix Version/s: 1.13.0
    • Component/s: Library / CEP
    • Environment:

      Flink 1.11.2 run using the official docker containers in AWS ECS Fargate.

      1 Job Manager, 1 Taskmanager with 2vCPUs and 8GB memory

      Description

      We have been observing instability in our production environment recently, seemingly related to state backends. We ended up building a load testing environment to isolate factors and have discovered that the CEP library appears to have some serious problems with state expiry.

      Job Topology

      Source: Kinesis (standard connector) -> keyBy() and forward to...
      CEP: Array of simple Keyed CEP Pattern operators (details below) -> forward output to...
      Sink: SQS (custom connector)

      The CEP Patterns in the test look like this:

      Pattern.begin(SCANS_SEQUENCE, AfterMatchSkipStrategy.skipPastLastEvent())
          .times(20)
          .subtype(ScanEvent.class)
          .within(Duration.minutes(30));
      

      Taskmanager Config

      taskmanager.numberOfTaskSlots: $numberOfTaskSlots
      taskmanager.data.port: 6121
      taskmanager.rpc.port: 6122
      taskmanager.exit-on-fatal-akka-error: true
      taskmanager.memory.process.size: $memoryProcessSize
      taskmanager.memory.jvm-metaspace.size: 256m
      taskmanager.memory.managed.size: 0m
      jobmanager.rpc.port: 6123
      blob.server.port: 6130
      rest.port: 8081
      web.submit.enable: true
      fs.s3a.connection.maximum: 50
      fs.s3a.threads.max: 50
      akka.framesize: 250m
      akka.watch.threshold: 14
      
      state.checkpoints.dir: s3://$savepointBucketName/checkpoints
      state.savepoints.dir: s3://$savepointBucketName/savepoints
      state.backend: filesystem
      state.backend.async: true
      
      s3.access-key: $s3AccessKey
      s3.secret-key: $s3SecretKey
      

      (the substitutions are controlled by terraform).

      Tests

      Test 1 (No key rotation)

      8192 actors (different keys) emitting 1 Scan Event every 10 minutes indefinitely. Actors (keys) never rotate in or out.

      Test 2 (Constant key rotation)

      8192 actors that produce 2 Scan events 10 minutes apart, then retire and never emit again. The setup creates new actors (keys) as soon as one finishes so we always have 8192. This test basically constantly rotates the key space.

      Results

      For both tests, the state size (checkpoint size) grows unbounded and linearly well past the 30 minute threshold that should have caused old keys or events to be discard from the state. In the chart below, the left (steep) half is the 24 hours we ran Test 1, the right (shallow) half is Test 2. My understanding is that the checkpoint size should level off after ~45 minutes or so then stay constant.

      Could someone please assist us with this? Unless we have dramatically misunderstood how the CEP library is supposed to function this seems like a pretty severe bug.

        Attachments

        1. screenshot-3.png
          86 kB
          Thomas Wozniakowski
        2. screenshot-2.png
          59 kB
          Thomas Wozniakowski
        3. screenshot-1.png
          30 kB
          Thomas Wozniakowski
        4. image-2020-11-04-11-35-12-126.png
          132 kB
          Thomas Wozniakowski

          Issue Links

            Activity

              People

              • Assignee:
                dwysakowicz Dawid Wysakowicz
                Reporter:
                Jamalarm Thomas Wozniakowski
              • Votes:
                0 Vote for this issue
                Watchers:
                9 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: