Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6537

Umbrella issue for fixes to incremental snapshots

    Details

      Description

      This issue tracks ongoing fixes in the incremental checkpointing feature for the 1.3 release.

        Activity

        Hide
        srichter Stefan Richter added a comment -

        So this is not strictly related to incremental checkpoints, but from a different problem. I know how to easily fix this for the release and found that some things could be nicer about this code in general (can refactor in 1.4). Tracking in FLINK-6685 and FLINK-6684.

        Show
        srichter Stefan Richter added a comment - So this is not strictly related to incremental checkpoints, but from a different problem. I know how to easily fix this for the release and found that some things could be nicer about this code in general (can refactor in 1.4). Tracking in FLINK-6685 and FLINK-6684 .
        Hide
        gyfora Gyula Fora added a comment -

        Thanks a lot for looking into this. I wont be able to verify any fix today as I am travelling to London, but I can try to do it tomorrow!!+

        Show
        gyfora Gyula Fora added a comment - Thanks a lot for looking into this. I wont be able to verify any fix today as I am travelling to London, but I can try to do it tomorrow!!+
        Hide
        srichter Stefan Richter added a comment -

        Ok, I think I have an idea about a possible problem, but it makes me wonder how this code has actually ever worked at all Will double check and then come up with a fix.

        Show
        srichter Stefan Richter added a comment - Ok, I think I have an idea about a possible problem, but it makes me wonder how this code has actually ever worked at all Will double check and then come up with a fix.
        Hide
        srichter Stefan Richter added a comment - - edited

        I had a look at those logs, but they seem strange to me. From the exceptions, the root cause seems to be that the SafetyNetWrapperFileSystem has a registry that is already closed, which can only happen in two places in Task, one correlated with checkpointing in line 834. We are using a thread local variable to hold the registry, but this variable is re-initialized with a fresh registry for each checkpoint runnable. Since we are not using InheritableThreadLocal, there should be no leaking to other threads, e.g. through threadpools. From the log, I also cannot see the precondition fail that indicates that there is already another registry in place. So right now, I only see the explanation that the an old, closed registry is leaked somehow, but cannot see how this is possible. For debugging purposes, what we could do is print the reference to the created, closed, and used registries that wrap the streams to check if an old registry somehow survives where it should not.

        I also tried to reproduce the problem, but unfortunately could never see this in any test runs. Is there something very special about your setup or are you using a customized version of Flink? For a test run, you could also deactivate the safety net in FileSystem line 389 and check if this solves all the problem. Still wonder how this actually can cause troubles, in particular as this is already a Flink 1.2 feature

        Edit: This should actually also cause savepoints with all other backends to fail?
        Edit 2: Ok, one way for this to happen is that the FileSystem is cached somewhere, where it shouldn't (in particular: without previously unwrapping it).

        Show
        srichter Stefan Richter added a comment - - edited I had a look at those logs, but they seem strange to me. From the exceptions, the root cause seems to be that the SafetyNetWrapperFileSystem has a registry that is already closed, which can only happen in two places in Task , one correlated with checkpointing in line 834. We are using a thread local variable to hold the registry, but this variable is re-initialized with a fresh registry for each checkpoint runnable. Since we are not using InheritableThreadLocal , there should be no leaking to other threads, e.g. through threadpools. From the log, I also cannot see the precondition fail that indicates that there is already another registry in place. So right now, I only see the explanation that the an old, closed registry is leaked somehow, but cannot see how this is possible. For debugging purposes, what we could do is print the reference to the created, closed, and used registries that wrap the streams to check if an old registry somehow survives where it should not. I also tried to reproduce the problem, but unfortunately could never see this in any test runs. Is there something very special about your setup or are you using a customized version of Flink? For a test run, you could also deactivate the safety net in FileSystem line 389 and check if this solves all the problem. Still wonder how this actually can cause troubles, in particular as this is already a Flink 1.2 feature Edit: This should actually also cause savepoints with all other backends to fail? Edit 2: Ok, one way for this to happen is that the FileSystem is cached somewhere, where it shouldn't (in particular: without previously unwrapping it).
        Hide
        gyfora Gyula Fora added a comment -

        You can look at all the logs I have posted. For example: https://gist.github.com/gyfora/2bb5569fb703bbd7e47ba60352f90086#file-gistfile1-txt-L197

        This exact error happens at every single savepoint attempt.

        Show
        gyfora Gyula Fora added a comment - You can look at all the logs I have posted. For example: https://gist.github.com/gyfora/2bb5569fb703bbd7e47ba60352f90086#file-gistfile1-txt-L197 This exact error happens at every single savepoint attempt.
        Hide
        srichter Stefan Richter added a comment - - edited

        Which exception is that exactly? And how does it happen all the time? There is no JIRA for it, because I have not encountered the problem. Can this be reproduced somehow? Is only the first savepoint failing or all? What exceptions still remain after the latest fixes?

        Edit: In a quick run on GCloud, I could not reproduce any problems with savepoints and RocksDB.

        Show
        srichter Stefan Richter added a comment - - edited Which exception is that exactly? And how does it happen all the time? There is no JIRA for it, because I have not encountered the problem. Can this be reproduced somehow? Is only the first savepoint failing or all? What exceptions still remain after the latest fixes? Edit: In a quick run on GCloud, I could not reproduce any problems with savepoints and RocksDB.
        Hide
        gyfora Gyula Fora added a comment -

        Hi Stefan!
        I can confirm that the incremental checkpointing and recovery now seems to work as expected Great job on the fixes!

        Savepointing still gives the same error regardless whether incremental checkpointing is used or not.
        Is there a JIRA for tracking that issue? I have no idea what might cause it

        Thank you!
        Gyula

        Show
        gyfora Gyula Fora added a comment - Hi Stefan! I can confirm that the incremental checkpointing and recovery now seems to work as expected Great job on the fixes! Savepointing still gives the same error regardless whether incremental checkpointing is used or not. Is there a JIRA for tracking that issue? I have no idea what might cause it Thank you! Gyula
        Hide
        srichter Stefan Richter added a comment -

        Hi,

        thanks a lot for the testing effort and yes, there is still a known problem with between the incremental checkpointing and the ZookeeperCompletedCheckpointStore. I am working on it.

        Best,
        Stefan

        Show
        srichter Stefan Richter added a comment - Hi, thanks a lot for the testing effort and yes, there is still a known problem with between the incremental checkpointing and the ZookeeperCompletedCheckpointStore . I am working on it. Best, Stefan
        Hide
        gyfora Gyula Fora added a comment -

        Hi Stefan!

        Thank you for all the work
        I have tested your changes today and some of the issues have been resolved.

        Incremental checkpoints now succeed but cannot be restored from them (ether after cacnellation or under normal failure scenarios)
        Savepoints still fail with the previous error. And also there seems to be some error while discarding subsumed checkpoints. I am attaching the logs containing the errors:

        In the beginning you can see the failed savepoint, then a couple of successful checkpoints with the warnings and at the end I killed the TM to trigger a restore which lead to file not found errors in HDFS:

        https://gist.github.com/gyfora/2bb5569fb703bbd7e47ba60352f90086

        I hope this helps
        Gyula

        Show
        gyfora Gyula Fora added a comment - Hi Stefan! Thank you for all the work I have tested your changes today and some of the issues have been resolved. Incremental checkpoints now succeed but cannot be restored from them (ether after cacnellation or under normal failure scenarios) Savepoints still fail with the previous error. And also there seems to be some error while discarding subsumed checkpoints. I am attaching the logs containing the errors: In the beginning you can see the failed savepoint, then a couple of successful checkpoints with the warnings and at the end I killed the TM to trigger a restore which lead to file not found errors in HDFS: https://gist.github.com/gyfora/2bb5569fb703bbd7e47ba60352f90086 I hope this helps Gyula

          People

          • Assignee:
            srichter Stefan Richter
            Reporter:
            srichter Stefan Richter
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:

              Development