Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4341

Kinesis connector does not emit maximum watermark properly

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.1.0, 1.1.1
    • Fix Version/s: 1.2.0, 1.1.2
    • Component/s: Streaming Connectors
    • Labels:
      None

      Description

      *Prevously reported as "Checkpoint state size grows unbounded when task parallelism not uniform"*

      This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I was previously using a 1.1.0 snapshot (commit 18995c8) which performed as expected. This issue was introduced somewhere between those commits.

      I've got a Flink application that uses the Kinesis Stream Consumer to read from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots each, providing a total of 4 slots. When running the application with a parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) and 4 slots for subsequent tasks that process the Kinesis stream data. I use an in-memory store for checkpoint data.

      Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint states were growing unbounded when running with a parallelism of 4, checkpoint interval of 10 seconds:

      ID  State Size
      1   11.3 MB
      2    20.9 MB
      3   30.6 MB
      4   41.4 MB
      5   52.6 MB
      6   62.5 MB
      7   71.5 MB
      8   83.3 MB
      9   93.5 MB
      

      The first 4 checkpoints generally succeed, but then fail with an exception like the following:

      java.lang.RuntimeException: Error triggering a checkpoint as the result of receiving checkpoint barrier at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758)
        at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
        at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
      Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=12105407 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
        at org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146)
        at org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200)
        at org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190)
        at org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762)
        ... 8 more
      

      Or:

      2016-08-09 17:44:43,626 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Restoring checkpointed state to task Fold: property_id, player -> 10-minute Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
      2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter            - Transient association error (association remains live) akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@10.55.2.212:6123/user/jobmanager#510517238]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was 10891825 bytes.
      

      This can be fixed by simply submitting the job with a parallelism of 2. I suspect there was a regression introduced relating to assumptions about the number of sub-tasks associated with a job stage (e.g. assuming 4 instead of a value ranging from 1-4). This is currently preventing me from using all available Task Manager slots.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2414

          Thanks @rmetzger !

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2414 Thanks @rmetzger !
          Show
          rmetzger Robert Metzger added a comment - Resolved for master in http://git-wip-us.apache.org/repos/asf/flink/commit/7b574cf5 Resolved for 1.1.2 in http://git-wip-us.apache.org/repos/asf/flink/commit/81f30c5e
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2414

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2414
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/2414

          Thank you for the pull request. I'll merge it to master and the release-1.1 branch.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2414 Thank you for the pull request. I'll merge it to master and the release-1.1 branch.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2414

          @rmetzger, @aljoscha the changes are ready for another review now, thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2414 @rmetzger, @aljoscha the changes are ready for another review now, thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2414

          To include the missing case @rmetzger mentioned, it turns out the fix is actually more complicated than I expected due to correct state determination after every reshard, and requires a bit of rework on our current shard discovery mechanism to get it right.

          Heads-up notice that this will probably need a re-review. Sorry for the delay, I'm currently still on it, hopefully will update the PR by the end of today

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2414 To include the missing case @rmetzger mentioned, it turns out the fix is actually more complicated than I expected due to correct state determination after every reshard, and requires a bit of rework on our current shard discovery mechanism to get it right. Heads-up notice that this will probably need a re-review. Sorry for the delay, I'm currently still on it, hopefully will update the PR by the end of today
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2414

          Minus @rmetzger's comment this looks good to merge! Thanks for fixing this @tzulitai!

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2414 Minus @rmetzger's comment this looks good to merge! Thanks for fixing this @tzulitai!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2414

          Ah yes, correct. I'll update this soon.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2414 Ah yes, correct. I'll update this soon.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/2414

          Thank you for opening a pull request to fix the issue.

          I think we also need to cover another case: What happens when the number of shards has been reduced in a resharding and some fetchers are now without a shard? I think in that case, the worker also needs to emit a final Long.MAX_VALUE, and it has to fail once it gets a shard assigned again.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2414 Thank you for opening a pull request to fix the issue. I think we also need to cover another case: What happens when the number of shards has been reduced in a resharding and some fetchers are now without a shard? I think in that case, the worker also needs to emit a final Long.MAX_VALUE, and it has to fail once it gets a shard assigned again.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/2414

          FLINK-4341 Let idle consumer subtasks emit max value watermarks and fail on resharding

          This is a short-term fix, until the min-watermark service for the JobManager described in the JIRA discussion is available.

          The way this fix works is that we let idle subtasks that initially don't get assigned shards emit a `Long.MAX_VALUE` watermark. Also, we only fail hard if an idle subtask is assigned new shards when resharding happens, to avoid messing up the watermarks. So, if all subtasks are not initially idle on startup (i.e., when total number of shards > consumer parallelism), the Kinesis consumer can still transparently handle resharding like before without failing.

          I've tested exactly-once with our manual tests (with and w/o resharding) and the fix works nicely, still retaining exactly-once guarantee despite non-transparency. However, I'm a bit unsure on how to test if the unbounded state with window operators is also fixed with this change, so we're still yet to clarify this.

          R: @rmetzger and @aljoscha for review. Thanks in advance!

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-4341

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2414.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2414


          commit bc8e50d99be745300f7418c58e9d30abc5469ba3
          Author: Gordon Tai <tzulitai@gmail.com>
          Date: 2016-08-24T08:38:06Z

          FLINK-4341 Let idle consumer subtasks emit max value watermarks and fail on resharding

          This no longer allows the Kinesis consumer to transparently handle resharding.
          This is a short-term workaround until we have a min-watermark notification service available in the JobManager.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/2414 FLINK-4341 Let idle consumer subtasks emit max value watermarks and fail on resharding This is a short-term fix, until the min-watermark service for the JobManager described in the JIRA discussion is available. The way this fix works is that we let idle subtasks that initially don't get assigned shards emit a `Long.MAX_VALUE` watermark. Also, we only fail hard if an idle subtask is assigned new shards when resharding happens, to avoid messing up the watermarks. So, if all subtasks are not initially idle on startup (i.e., when total number of shards > consumer parallelism), the Kinesis consumer can still transparently handle resharding like before without failing. I've tested exactly-once with our manual tests (with and w/o resharding) and the fix works nicely, still retaining exactly-once guarantee despite non-transparency. However, I'm a bit unsure on how to test if the unbounded state with window operators is also fixed with this change, so we're still yet to clarify this. R: @rmetzger and @aljoscha for review. Thanks in advance! You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-4341 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2414.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2414 commit bc8e50d99be745300f7418c58e9d30abc5469ba3 Author: Gordon Tai <tzulitai@gmail.com> Date: 2016-08-24T08:38:06Z FLINK-4341 Let idle consumer subtasks emit max value watermarks and fail on resharding This no longer allows the Kinesis consumer to transparently handle resharding. This is a short-term workaround until we have a min-watermark notification service available in the JobManager.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          I agree. I've been trying around for the past few days, and I think this is the only way to go for now.
          I'll see if there's a easy way to deactivate this and add Long.MAX_VALUE back easily in the current code without reverting back.

          I'm also quite interested in working on the low watermark service for the JobManager. However, since there seems to be some ongoing effort of reworking the JobManager, I wonder if it makes sense to implement this service into the current JobManager now?

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - I agree. I've been trying around for the past few days, and I think this is the only way to go for now. I'll see if there's a easy way to deactivate this and add Long.MAX_VALUE back easily in the current code without reverting back. I'm also quite interested in working on the low watermark service for the JobManager. However, since there seems to be some ongoing effort of reworking the JobManager, I wonder if it makes sense to implement this service into the current JobManager now?
          Hide
          rmetzger Robert Metzger added a comment -

          For the short term: I think we have to disable the transparent resharding support in Kinesis.
          Before we can add support for partition discovery in Kafka (FLINK-4022) and before activating resharding in kinesis again, we need to implement a low watermark service in the JobManager.

          Show
          rmetzger Robert Metzger added a comment - For the short term: I think we have to disable the transparent resharding support in Kinesis. Before we can add support for partition discovery in Kafka ( FLINK-4022 ) and before activating resharding in kinesis again, we need to implement a low watermark service in the JobManager.
          Hide
          skidder Scott Kidder added a comment -

          FYI, I've confirmed that explicitly setting the parallelism on the Kinesis source to equal the number of shards (e.g. 2) allows the remaining application stages to run at larger parallelism values (e.g. 4) successfully.

          Show
          skidder Scott Kidder added a comment - FYI, I've confirmed that explicitly setting the parallelism on the Kinesis source to equal the number of shards (e.g. 2) allows the remaining application stages to run at larger parallelism values (e.g. 4) successfully.
          Hide
          StephanEwen Stephan Ewen added a comment -

          Aljoscha Krettek This is probably a good long term solution.
          We still need to see if we can find a more lightweight fix that we can apply to 1.1.2

          Show
          StephanEwen Stephan Ewen added a comment - Aljoscha Krettek This is probably a good long term solution. We still need to see if we can find a more lightweight fix that we can apply to 1.1.2
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Tzu-Li (Gordon) Tai We (Stephan and I) were quickly bouncing around some ideas and what we came up with as a likely solution is to introduce some service in the JobManager (per source) that gets messages from all sources about their min-watermark. The service would then send out the min-watermark to all sources again so that sources that don't have a watermark of their own can emit that as a watermark.

          This should work because the watermark should not decrease due to resharding because the data is still hopefully the same.

          Show
          aljoscha Aljoscha Krettek added a comment - Tzu-Li (Gordon) Tai We (Stephan and I) were quickly bouncing around some ideas and what we came up with as a likely solution is to introduce some service in the JobManager (per source) that gets messages from all sources about their min-watermark. The service would then send out the min-watermark to all sources again so that sources that don't have a watermark of their own can emit that as a watermark. This should work because the watermark should not decrease due to resharding because the data is still hopefully the same.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          I can take a look at fixing this after I get back from vacation this week Thanks Aljoscha Krettek for pointing this out.

          One question, what will be a reasonable way to fix this? The rework in 17dfd68d05be991b58ffe4b9252b09ca61ec8f05 took away the Long.MAX_VALUE because parallel source instances that initially did not have shards assigned to it may still have shards due to resharding of Kinesis streams in the future. I'm working on Kafka repartitioning too for our Kafka connectors, and we'll likely need to deal with this there too.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited I can take a look at fixing this after I get back from vacation this week Thanks Aljoscha Krettek for pointing this out. One question, what will be a reasonable way to fix this? The rework in 17dfd68d05be991b58ffe4b9252b09ca61ec8f05 took away the Long.MAX_VALUE because parallel source instances that initially did not have shards assigned to it may still have shards due to resharding of Kinesis streams in the future. I'm working on Kafka repartitioning too for our Kafka connectors, and we'll likely need to deal with this there too.
          Hide
          rmetzger Robert Metzger added a comment -

          The reason why the Kinesis consumer is not emitting the Long.MAX_VALUE watermark is to support transparent resharding.
          So if the number of shards changes, inactive consumer instances might start consuming records. If we send a Long.MAX_VALUE, the watermarks would be messed up.

          This means that we need to fail the Kinesis consumer if a resharding has occurred.

          Show
          rmetzger Robert Metzger added a comment - The reason why the Kinesis consumer is not emitting the Long.MAX_VALUE watermark is to support transparent resharding. So if the number of shards changes, inactive consumer instances might start consuming records. If we send a Long.MAX_VALUE , the watermarks would be messed up. This means that we need to fail the Kinesis consumer if a resharding has occurred.
          Hide
          StephanEwen Stephan Ewen added a comment - - edited

          Thanks, Aljoscha Krettek for debugging this. Robert Metzger and Tzu-Li (Gordon) Tai do you think you can have a look at this? Sounds like this should go into the next version and bugfix release.

          Show
          StephanEwen Stephan Ewen added a comment - - edited Thanks, Aljoscha Krettek for debugging this. Robert Metzger and Tzu-Li (Gordon) Tai do you think you can have a look at this? Sounds like this should go into the next version and bugfix release.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          The regression was introduced in commit 17dfd68d05be991b58ffe4b9252b09ca61ec8f05. The problem is that before, the Kinesis Source was emitting a Long.MAX_VALUE watermark if now shards were assigned to it. With the change the kinesis source no longer does this. What this means is that it will not emit watermarks which in turn means that downstream watermarks, i.e. at the window operators, don't advance. Thus, the WindowOperator will simply accumulate state for all the windows that it keeps and never fire/purge that state. This explains the growing state size.

          Show
          aljoscha Aljoscha Krettek added a comment - The regression was introduced in commit 17dfd68d05be991b58ffe4b9252b09ca61ec8f05. The problem is that before, the Kinesis Source was emitting a Long.MAX_VALUE watermark if now shards were assigned to it. With the change the kinesis source no longer does this. What this means is that it will not emit watermarks which in turn means that downstream watermarks, i.e. at the window operators, don't advance. Thus, the WindowOperator will simply accumulate state for all the windows that it keeps and never fire/purge that state. This explains the growing state size.
          Hide
          skidder Scott Kidder added a comment -

          I repeated my tests using Flink 1.1.1 and continue to have problems with the checkpoint growing in size until it reaches the AKKA message size limit (I override it to allow up to 30MB) or the heap-memory is exhausted on the task managers. It's as though old data that's outside the window continues to occupy memory.

          Show
          skidder Scott Kidder added a comment - I repeated my tests using Flink 1.1.1 and continue to have problems with the checkpoint growing in size until it reaches the AKKA message size limit (I override it to allow up to 30MB) or the heap-memory is exhausted on the task managers. It's as though old data that's outside the window continues to occupy memory.
          Hide
          StephanEwen Stephan Ewen added a comment -

          Scott Kidder Do you set something like "allowed lateness" in the window operator?

          Would be great if you could share your program (via private mail stephan@data-artisans.com) then we can try and see if we can see something suspicious.

          Show
          StephanEwen Stephan Ewen added a comment - Scott Kidder Do you set something like "allowed lateness" in the window operator? Would be great if you could share your program (via private mail stephan@data-artisans.com) then we can try and see if we can see something suspicious.
          Hide
          rmetzger Robert Metzger added a comment -

          In my tests so far, the Kinesis consumer is not using much memory (few KBs) (in a 2TM, 4 slots setup with 2 shards)
          From the reported exception, it seems that the WindowOperator is exceeding the limit of the memory state backend, not Kinesis.

          Show
          rmetzger Robert Metzger added a comment - In my tests so far, the Kinesis consumer is not using much memory (few KBs) (in a 2TM, 4 slots setup with 2 shards) From the reported exception, it seems that the WindowOperator is exceeding the limit of the memory state backend, not Kinesis.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          I just did a test on revision 45f7825111232f0dd225068a72d8a092f67d49d0 (slow), with 2 Kinesis shards & source parallelism of 4. The job consisted only of the Kinesis consumer source and print as sink (the ManualConsumerProducerTest in our manual tests, with checkpointing enabled). However, I did not experience significant slowdown with & w/o checkpointing. I also did not observe unbounded checkpoint size.

          I'm afraid I currently don't have any idea of what may be causing the reported issue in the Kinesis connector. I'll keep this in mind.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited I just did a test on revision 45f7825111232f0dd225068a72d8a092f67d49d0 (slow), with 2 Kinesis shards & source parallelism of 4. The job consisted only of the Kinesis consumer source and print as sink (the ManualConsumerProducerTest in our manual tests, with checkpointing enabled). However, I did not experience significant slowdown with & w/o checkpointing. I also did not observe unbounded checkpoint size. I'm afraid I currently don't have any idea of what may be causing the reported issue in the Kinesis connector. I'll keep this in mind.
          Hide
          StephanEwen Stephan Ewen added a comment -

          Scott, we are trying to set up a test to see if some changes in the Kinesis connector between the revisions you mentioned may be responsible.

          To confirm: We will test revision 18995c8c7b8f9c75035b1a95a00379944c6f2b0c (fast) against revision 45f7825111232f0dd225068a72d8a092f67d49d0 (slow)

          Tzu-Li (Gordon) Tai Do you have a suspicion or hunch what we should look out for?

          Concerning the latest commits from release-1.1 branch: The release-1.1 branch has the 1.1.x versions in progress. After the 1.1.0 release candidate was created, new fixes were added that will go into 1.1.x versions. The 1.1.1 release fixes only the maven deployment of the 1.1.0 release (no code changes) and the 1.1.2 release will contain all the commits in the release-1.1 branch at that point.

          Show
          StephanEwen Stephan Ewen added a comment - Scott, we are trying to set up a test to see if some changes in the Kinesis connector between the revisions you mentioned may be responsible. To confirm: We will test revision 18995c8c7b8f9c75035b1a95a00379944c6f2b0c (fast) against revision 45f7825111232f0dd225068a72d8a092f67d49d0 (slow) Tzu-Li (Gordon) Tai Do you have a suspicion or hunch what we should look out for? Concerning the latest commits from release-1.1 branch: The release-1.1 branch has the 1.1.x versions in progress. After the 1.1.0 release candidate was created, new fixes were added that will go into 1.1.x versions. The 1.1.1 release fixes only the maven deployment of the 1.1.0 release (no code changes) and the 1.1.2 release will contain all the commits in the release-1.1 branch at that point.
          Hide
          rmetzger Robert Metzger added a comment -

          I'll implement a little testing job to see if Kinesis is causing the issues with the growing state.

          Show
          rmetzger Robert Metzger added a comment - I'll implement a little testing job to see if Kinesis is causing the issues with the growing state.
          Hide
          skidder Scott Kidder added a comment -

          Unfortunately I don't have the time to develop a test application to demonstrate this issue without Kinesis. It might be reproducible with Kafka when the number of shards (or the Kafka equivalent) is less than the parallelism specified on the job.

          Also, I noticed that the 1.1.0 binaries available for download were created on August 4 and don't include the latest commits on the release-1.1 branch in Git. Do you know when they'll be updated? I deploy Flink as a Docker container and use the Flink tar-gzip binary to build the Docker image.

          Show
          skidder Scott Kidder added a comment - Unfortunately I don't have the time to develop a test application to demonstrate this issue without Kinesis. It might be reproducible with Kafka when the number of shards (or the Kafka equivalent) is less than the parallelism specified on the job. Also, I noticed that the 1.1.0 binaries available for download were created on August 4 and don't include the latest commits on the release-1.1 branch in Git. Do you know when they'll be updated? I deploy Flink as a Docker container and use the Flink tar-gzip binary to build the Docker image.
          Hide
          StephanEwen Stephan Ewen added a comment -

          Thanks for reporting this issue. Sounds pretty serious.

          Would be important to see if this is a general checkpointing regression, or a change in the behavior of the Kinesis connector since the snapshot version you used.

          Do you have a way to test this job with a testdata-generating source, i.e. without Kinesis?

          Show
          StephanEwen Stephan Ewen added a comment - Thanks for reporting this issue. Sounds pretty serious. Would be important to see if this is a general checkpointing regression, or a change in the behavior of the Kinesis connector since the snapshot version you used. Do you have a way to test this job with a testdata-generating source, i.e. without Kinesis?
          Hide
          skidder Scott Kidder added a comment -

          I also noticed that when checkpointing is enabled and I'm using a parallelism of 2 the processing speed is extremely slow compared to that of Flink 18995c8. I disabled checkpointing altogether and the speed returned to previous levels.

          I'm currently building Flink from source to pull in hotfixes added to the release-1.1 branch since commit 45f7825. I'll update this issue with my findings.

          Show
          skidder Scott Kidder added a comment - I also noticed that when checkpointing is enabled and I'm using a parallelism of 2 the processing speed is extremely slow compared to that of Flink 18995c8. I disabled checkpointing altogether and the speed returned to previous levels. I'm currently building Flink from source to pull in hotfixes added to the release-1.1 branch since commit 45f7825. I'll update this issue with my findings.

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              skidder Scott Kidder
            • Votes:
              0 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development