Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.1
    • Component/s: None
    • Labels:
      None

      Description

      It still isn't feasible to run without an application level fsync policy. This is a problem as fsync locks the file and tuning such a policy so that the flushes aren't so frequent that seeks reduce throughput, yet not so infrequent that the fsync is writing so much data that there is a noticable jump in latency is very challenging.

      The remaining problem is the way that log recovery works. Our current policy is that if a clean shutdown occurs we do no recovery. If an unclean shutdown occurs we recovery the last segment of all logs. To make this correct we need to ensure that each segment is fsync'd before we create a new segment. Hence the fsync during roll.

      Obviously if the fsync during roll is the only time fsync occurs then it will potentially write out the entire segment which for a 1GB segment at 50mb/sec might take many seconds. The goal of this JIRA is to eliminate this and make it possible to run with no application-level fsyncs at all, depending entirely on replication and background writeback for durability.

      1. KAFKA-615-v8.patch
        62 kB
        Jay Kreps
      2. KAFKA-615-v7.patch
        62 kB
        Jay Kreps
      3. KAFKA-615-v6.patch
        62 kB
        Jay Kreps
      4. KAFKA-615-v5.patch
        62 kB
        Jay Kreps
      5. KAFKA-615-v4.patch
        61 kB
        Jay Kreps
      6. KAFKA-615-v3.patch
        61 kB
        Jay Kreps
      7. KAFKA-615-v2.patch
        60 kB
        Jay Kreps
      8. KAFKA-615-v1.patch
        51 kB
        Jay Kreps

        Activity

        Hide
        Jay Kreps added a comment -

        Here is a quick thought on how this might work to kick things off. A few things to consider:
        1. Currently recovery works on a per segment basis as the index is recreated from scratch. It will be tricky to do partial segment recovery. I recommend we avoid this. If we want to speed up recovery we can always just make segment sizes smaller.
        2. We need to guarantee that fsync can never happen on the active segment under any circumstances for this to really work--the syncing must be fully async since the fsync's may be very slow. To make this work we need to somehow record what has and has not been fsync'd.
        3. One complicating factor is that an inactive segment may become active once again because of a truncate operation. In this case more messages will be appended and we will need to fsync it again after it is rolled again. If we crash in the intervening time we can't think the old fsync counted after the truncation and further appends.

        My thought was to keep a text file containing:
        topic partition offset
        This file would be populated by a background thread that would flush segments and update the file. At recovery time we would use this file to determine the latest flushed segment for each topic/partition and only recover segments newer than this. This would generally mean recovering only the last segment.
        Some notes on maintaining this file:

        • When a segment is flushed we would immediately append a new entry to this file without fsyncing. Doing this as appends means that we need only write out a small bit of data incrementally. Losing an entry is usually okay, it would just mean we would do an unnecessary recovery so background flush should usually be okay (exception noted before).
        • We would keep an in memory map with the latest offset for each topic/partition.
        • To avoid the log growing forever we would periodically write out the full contents of the map to a new file and swap this for the old log.
        • We should probably keep one of these files for each data directory.
        • The case of truncate (i.e. moving the offset backwards) needs to be thought through. In this case I think we may need to immediately fsync the file to avoid a case where we lose an entry in the file and therefor think we have flushed more than we actually have.

        We should probably refactor the existing background flush thread so it would now handle both the time based flush and the flushing of old segments just to avoid having two threads and since they are very similar in their role. Note that it is fine to have both running, since if the data is already flushed due to a time or interval rule, then the flushing of the old segment can just proceed and will essentially be a no-op (since the file will have no dirty pages).

        I recommend we leave the existing time and interval flush policies in place but now default both to infinity. These will now be rarely used options mostly useful for people who are paranoid, running only a single broker, or using a crappy filesystem where fsync locks the everything.

        In terms of code structure it is quite tricky and I don't quite know how to do it. A lot of the challenge is that the flush thread and flush journal file will be global for all logs, but the roll() call is at the log level. The first question is how do we know what segments to flush? One way would be to have the background thread just periodically (say every 30 seconds) scan all logs and see if they have any new segments that need flushing. The downside of this is that it is O in terms of the number of logs, which perhaps is not ideal. Another way would be to have a roll() somehow enqueue a flush operation for the background thread to carry out. The later case may be more efficient but tangles the log with the layers above it. It would be good to work out the details of how this would work up front.

        Show
        Jay Kreps added a comment - Here is a quick thought on how this might work to kick things off. A few things to consider: 1. Currently recovery works on a per segment basis as the index is recreated from scratch. It will be tricky to do partial segment recovery. I recommend we avoid this. If we want to speed up recovery we can always just make segment sizes smaller. 2. We need to guarantee that fsync can never happen on the active segment under any circumstances for this to really work--the syncing must be fully async since the fsync's may be very slow. To make this work we need to somehow record what has and has not been fsync'd. 3. One complicating factor is that an inactive segment may become active once again because of a truncate operation. In this case more messages will be appended and we will need to fsync it again after it is rolled again. If we crash in the intervening time we can't think the old fsync counted after the truncation and further appends. My thought was to keep a text file containing: topic partition offset This file would be populated by a background thread that would flush segments and update the file. At recovery time we would use this file to determine the latest flushed segment for each topic/partition and only recover segments newer than this. This would generally mean recovering only the last segment. Some notes on maintaining this file: When a segment is flushed we would immediately append a new entry to this file without fsyncing. Doing this as appends means that we need only write out a small bit of data incrementally. Losing an entry is usually okay, it would just mean we would do an unnecessary recovery so background flush should usually be okay (exception noted before). We would keep an in memory map with the latest offset for each topic/partition. To avoid the log growing forever we would periodically write out the full contents of the map to a new file and swap this for the old log. We should probably keep one of these files for each data directory. The case of truncate (i.e. moving the offset backwards) needs to be thought through. In this case I think we may need to immediately fsync the file to avoid a case where we lose an entry in the file and therefor think we have flushed more than we actually have. We should probably refactor the existing background flush thread so it would now handle both the time based flush and the flushing of old segments just to avoid having two threads and since they are very similar in their role. Note that it is fine to have both running, since if the data is already flushed due to a time or interval rule, then the flushing of the old segment can just proceed and will essentially be a no-op (since the file will have no dirty pages). I recommend we leave the existing time and interval flush policies in place but now default both to infinity. These will now be rarely used options mostly useful for people who are paranoid, running only a single broker, or using a crappy filesystem where fsync locks the everything. In terms of code structure it is quite tricky and I don't quite know how to do it. A lot of the challenge is that the flush thread and flush journal file will be global for all logs, but the roll() call is at the log level. The first question is how do we know what segments to flush? One way would be to have the background thread just periodically (say every 30 seconds) scan all logs and see if they have any new segments that need flushing. The downside of this is that it is O in terms of the number of logs, which perhaps is not ideal. Another way would be to have a roll() somehow enqueue a flush operation for the background thread to carry out. The later case may be more efficient but tangles the log with the layers above it. It would be good to work out the details of how this would work up front.
        Hide
        Jay Kreps added a comment -

        Attached a draft patch for a first version of this for early feedback. A few details remain to work out.

        This patch removes the per-data-directory .kafka_cleanshutdown file as well as the concept of a "clean shutdown". The concept of clean shutdown is replaced with the concept of "recovery point". The recovery point is the offset from which the log must be recovered. Recovery points are checkpointed in a per-data-directory file called recovery-point-offset-checkpoint. This uses normal offset checkpoint file format.

        Previously we always recovered the last log segment unless a clean shutdown was recorded. Now we recover from the recovery point--which may mean recovering many segments. We do not, however, recover partial segments: if the recovery point falls in the middle of a segment we recover that segment from the beginning.

        On shutdown we force a flush and checkpoint which has the same effect as the cleanshutdown file did before.

        Deleting the recovery-point-offset-checkpoint file will cause running full recovery on your log on restart which is kind of a nice feature if you suspect any kind of corruption in the log.

        Log.flush now takes an offset argument and flushes from the recovery point up to the given offset. This allows more granular control to avoid syncing (and hence locking) the active segment.

        Log.roll() now uses the scheduler to make its flush asynchronous. This flush now only covers up to the segment that is just completed, not the newly created segment, so there should be no locking of the active segment any more.

        The per-topic flush policy based on # messages and time still remains but now it defaults to off so we rely only on

        I did some preliminary performance testing and we can indeed run with no application-level flush policy with reasonable latency which is both convenient (no tuning to do) and yields much better throughput. I will do more testing and report results.

        Show
        Jay Kreps added a comment - Attached a draft patch for a first version of this for early feedback. A few details remain to work out. This patch removes the per-data-directory .kafka_cleanshutdown file as well as the concept of a "clean shutdown". The concept of clean shutdown is replaced with the concept of "recovery point". The recovery point is the offset from which the log must be recovered. Recovery points are checkpointed in a per-data-directory file called recovery-point-offset-checkpoint. This uses normal offset checkpoint file format. Previously we always recovered the last log segment unless a clean shutdown was recorded. Now we recover from the recovery point--which may mean recovering many segments. We do not, however, recover partial segments: if the recovery point falls in the middle of a segment we recover that segment from the beginning. On shutdown we force a flush and checkpoint which has the same effect as the cleanshutdown file did before. Deleting the recovery-point-offset-checkpoint file will cause running full recovery on your log on restart which is kind of a nice feature if you suspect any kind of corruption in the log. Log.flush now takes an offset argument and flushes from the recovery point up to the given offset. This allows more granular control to avoid syncing (and hence locking) the active segment. Log.roll() now uses the scheduler to make its flush asynchronous. This flush now only covers up to the segment that is just completed, not the newly created segment, so there should be no locking of the active segment any more. The per-topic flush policy based on # messages and time still remains but now it defaults to off so we rely only on I did some preliminary performance testing and we can indeed run with no application-level flush policy with reasonable latency which is both convenient (no tuning to do) and yields much better throughput. I will do more testing and report results.
        Hide
        Jay Kreps added a comment - - edited

        New patch with a couple of improvements:
        1. Found and fixed a bug in recovery that lead to recovering logs even in clean shutdown case.
        2. Now we always resize indexes for all segments during recovery as the index size may change. Not doing this was a bug in the previous patch.
        3. I now force a checkpoint of the recovery points in ReplicaManager.becomeLeaderOrFollower() to handle crashes after log truncate.
        4. Added a unit test that intentionally corrupts a log and checks recovery.

        I also did some performance testing on my desktop machine. We can sustain very high throughput, but as we approach the maximum throughput of the drive latency will get worse and worse.

        But as one data point I could do 75Mb/sec sustained writes across 500 logs on a single drive machine that can do a peek of 120MB/sec with avg write latency of < 1ms and maximum latency of about 350ms.

        Show
        Jay Kreps added a comment - - edited New patch with a couple of improvements: 1. Found and fixed a bug in recovery that lead to recovering logs even in clean shutdown case. 2. Now we always resize indexes for all segments during recovery as the index size may change. Not doing this was a bug in the previous patch. 3. I now force a checkpoint of the recovery points in ReplicaManager.becomeLeaderOrFollower() to handle crashes after log truncate. 4. Added a unit test that intentionally corrupts a log and checks recovery. I also did some performance testing on my desktop machine. We can sustain very high throughput, but as we approach the maximum throughput of the drive latency will get worse and worse. But as one data point I could do 75Mb/sec sustained writes across 500 logs on a single drive machine that can do a peek of 120MB/sec with avg write latency of < 1ms and maximum latency of about 350ms.
        Hide
        Jay Kreps added a comment -

        Patch version v3:

        • Found a call to flush the index in Log.roll(). Removed this.
        Show
        Jay Kreps added a comment - Patch version v3: Found a call to flush the index in Log.roll(). Removed this.
        Hide
        Jay Kreps added a comment -

        Rebased patch to trunk.

        Show
        Jay Kreps added a comment - Rebased patch to trunk.
        Hide
        Jay Kreps added a comment -

        Attach updated patch v5. Rebased against trunk and with added support for compression in the write throughput test.

        Show
        Jay Kreps added a comment - Attach updated patch v5. Rebased against trunk and with added support for compression in the write throughput test.
        Hide
        Neha Narkhede added a comment -

        Thanks for all the patches. Overall, it's a good patch. Few minor comments -

        Log

        1. Remove recoverFrom in the comments for Log
        2. Remove count from comments for LogAppendInfo
        3. In recoverLog(), it seems that it makes sense to break out of the while loop as soon as you find a segment that needs to be truncated. Right now, it goes back and tries to iterate through remaining unflushed segments which have been deleted

        Show
        Neha Narkhede added a comment - Thanks for all the patches. Overall, it's a good patch. Few minor comments - Log 1. Remove recoverFrom in the comments for Log 2. Remove count from comments for LogAppendInfo 3. In recoverLog(), it seems that it makes sense to break out of the while loop as soon as you find a segment that needs to be truncated. Right now, it goes back and tries to iterate through remaining unflushed segments which have been deleted
        Hide
        Jay Kreps added a comment -

        Updated patch:

        • Removed bad scaladoc
        • Improved log corruption test to cover corruption in a non-final segment to show that the existing logic works

        Actually the recoverLog method is right. It loops through the unflushed segments validating them. When it finds a bad one it truncates to the right position in that segment and then loops over all remaining segments and deletes them. The confusing part, I think, is that unflushed is an iterator so unflushed.foreach(deleteSegment) actually ends the loop because a post condition of that is that unflushed.hasNext is false. I agree that is kind of tricky. Not sure if there is a more clear way to do that (I tried, that was what I came up with...wish we had break).

        Show
        Jay Kreps added a comment - Updated patch: Removed bad scaladoc Improved log corruption test to cover corruption in a non-final segment to show that the existing logic works Actually the recoverLog method is right. It loops through the unflushed segments validating them. When it finds a bad one it truncates to the right position in that segment and then loops over all remaining segments and deletes them. The confusing part, I think, is that unflushed is an iterator so unflushed.foreach(deleteSegment) actually ends the loop because a post condition of that is that unflushed.hasNext is false. I agree that is kind of tricky. Not sure if there is a more clear way to do that (I tried, that was what I came up with...wish we had break).
        Hide
        Jun Rao added a comment -

        Thanks for patch v5. Some comments:

        50. Log:
        50.1 recoveryLog(): It seems that recoveryPoint can be > lastOffset due to truncation on unclean shutdown. See the comment in 52.2.

        50.2 The comment in the following code is no longer correct since it's not just recovering the active segment. Also, it seems that if we hit the exception, we should delete the rest of the segments after resetting current segment to startOffset.
        } catch

        { case e: InvalidOffsetException => val startOffset = curr.baseOffset warn("Found invalid offset during recovery of the active segment for topic partition " + dir.getName +". Deleting the segment and " + "creating an empty one with starting offset " + startOffset) // truncate the active segment to its starting offset curr.truncateTo(startOffset) }

        50.3 the log flusher scheduler is multi-threaded. I am wondering if that guarantees that the flushes on the same log will complete in recovery point order, which is important?

        51. LogSegment.recover(): the comment for the return value is incorrect. We return truncated bytes, not messages.

        52. ReplicaManager:
        52.1 The checkpointing of recovery point can be done once per LeaderAndIsr request, not per partition.
        52.2 There is this corner case that I am not sure how to handle. Suppose that we truncate a log and immediately crash before flushing the recovery points. During recovery, we can happen is that a recovery point may be larger than logEndOffset. However, the log may need recovery since we don't know whether the flushing on truncated data succeeded or not. So, perhaps what we can do is that in recoveryLog(), if (lastOffset <= this.recoveryPoint), we force recover the last segment?

        53. Could you verify that the basic system test works?

        Show
        Jun Rao added a comment - Thanks for patch v5. Some comments: 50. Log: 50.1 recoveryLog(): It seems that recoveryPoint can be > lastOffset due to truncation on unclean shutdown. See the comment in 52.2. 50.2 The comment in the following code is no longer correct since it's not just recovering the active segment. Also, it seems that if we hit the exception, we should delete the rest of the segments after resetting current segment to startOffset. } catch { case e: InvalidOffsetException => val startOffset = curr.baseOffset warn("Found invalid offset during recovery of the active segment for topic partition " + dir.getName +". Deleting the segment and " + "creating an empty one with starting offset " + startOffset) // truncate the active segment to its starting offset curr.truncateTo(startOffset) } 50.3 the log flusher scheduler is multi-threaded. I am wondering if that guarantees that the flushes on the same log will complete in recovery point order, which is important? 51. LogSegment.recover(): the comment for the return value is incorrect. We return truncated bytes, not messages. 52. ReplicaManager: 52.1 The checkpointing of recovery point can be done once per LeaderAndIsr request, not per partition. 52.2 There is this corner case that I am not sure how to handle. Suppose that we truncate a log and immediately crash before flushing the recovery points. During recovery, we can happen is that a recovery point may be larger than logEndOffset. However, the log may need recovery since we don't know whether the flushing on truncated data succeeded or not. So, perhaps what we can do is that in recoveryLog(), if (lastOffset <= this.recoveryPoint), we force recover the last segment? 53. Could you verify that the basic system test works?
        Hide
        Jay Kreps added a comment -

        Ah thanks for the detailed review:
        50.2 Yes, nice.
        50.3 I thought of this but don't think it is a problem. Flushes are always up to a particular recovery point. So let us say that we are flushing on every offset and flush(100) and flush(101) are reordered since they are async. That is actually okay, flush(100) will not actually write any data, and the check to update the recovery point is always done in a lock to ensure it doesn't get clobbered by out of order flushes. Let me know if you see something I am missing.
        51. Yup.
        52.1 Good point
        52.2 Yeah I thought of this too. My claim is that it is okay as long as the usage is something like (1) stop writes, (2) flush the checkpoints, (3) take new writes. If we do this then there are two cases (1) the metadata write that truncated the file occurred, or (2) it did not occur. If it did not occur then it is no different then if we crashed prior to the truncate. If it did occur and if the log end is before the recovery point then that is still fine because that is stable storage (we just masked off the end of the log) so we don't need to recover that. The troublesome case is if we take writes before flushing the checkpoint, then we are in trouble. The question is whether those assumptions actually hold? However, I think there is one bug at least that you lead me to which is that I need to ensure the recovery point is then reduced to the log end, or else after we append data we would think it was flushed. Let me know if you buy this analysis.
        FWIW I actually think our truncate logic may have a hole in 0.8 because we always recover the last segment only. However consider a case where we we truncate off the last segment, making a previous segment active, then we take some writes (but no flush) and then crash. In this case it is possible that the segment we truncated off reappears and also that we have partial writes and old data in the prior segment. But on recovery we will only check the zombie segment and ignore the prior segment.
        One way to simplify some of the reasoning would just be to fsync on truncate which it doesn't look like we do now. That would help us out of a lot of corner cases. The downside is it may add a lot of time to the becomeFollower because of the burst of writes.
        53 Will do

        Let me know what you think about those two discussion points. I would rather fully think this through now than chase 1 in a million bugs later.

        Show
        Jay Kreps added a comment - Ah thanks for the detailed review: 50.2 Yes, nice. 50.3 I thought of this but don't think it is a problem. Flushes are always up to a particular recovery point. So let us say that we are flushing on every offset and flush(100) and flush(101) are reordered since they are async. That is actually okay, flush(100) will not actually write any data, and the check to update the recovery point is always done in a lock to ensure it doesn't get clobbered by out of order flushes. Let me know if you see something I am missing. 51. Yup. 52.1 Good point 52.2 Yeah I thought of this too. My claim is that it is okay as long as the usage is something like (1) stop writes, (2) flush the checkpoints, (3) take new writes. If we do this then there are two cases (1) the metadata write that truncated the file occurred, or (2) it did not occur. If it did not occur then it is no different then if we crashed prior to the truncate. If it did occur and if the log end is before the recovery point then that is still fine because that is stable storage (we just masked off the end of the log) so we don't need to recover that. The troublesome case is if we take writes before flushing the checkpoint, then we are in trouble. The question is whether those assumptions actually hold? However, I think there is one bug at least that you lead me to which is that I need to ensure the recovery point is then reduced to the log end, or else after we append data we would think it was flushed. Let me know if you buy this analysis. FWIW I actually think our truncate logic may have a hole in 0.8 because we always recover the last segment only. However consider a case where we we truncate off the last segment, making a previous segment active, then we take some writes (but no flush) and then crash. In this case it is possible that the segment we truncated off reappears and also that we have partial writes and old data in the prior segment. But on recovery we will only check the zombie segment and ignore the prior segment. One way to simplify some of the reasoning would just be to fsync on truncate which it doesn't look like we do now. That would help us out of a lot of corner cases. The downside is it may add a lot of time to the becomeFollower because of the burst of writes. 53 Will do Let me know what you think about those two discussion points. I would rather fully think this through now than chase 1 in a million bugs later.
        Hide
        Jun Rao added a comment -

        50.3 Yes, I think your reasoning is correct. I didn't look at the code carefully enough.

        52.2 For the first part, that was my initial analysis too. Then, I was thinking the file system has to flush both the metadata and the data. During a crash, could the last segment be in a state that it's metadata (and thus length) is flushed, but the actual data is not. Does flush guarantee data is flushed before the metadata? Forcing a flush on every truncate is safe, but will delay the processing of the LeaderAndIsr request and it's probably too pessimistic. That's why I was thinking of running recovery on the last segment during startup if lastOffset < this.recoveryPoint.

        For the second part, the hole that you described in current 0.8 won't happen since we force a flush on log rolling.

        Show
        Jun Rao added a comment - 50.3 Yes, I think your reasoning is correct. I didn't look at the code carefully enough. 52.2 For the first part, that was my initial analysis too. Then, I was thinking the file system has to flush both the metadata and the data. During a crash, could the last segment be in a state that it's metadata (and thus length) is flushed, but the actual data is not. Does flush guarantee data is flushed before the metadata? Forcing a flush on every truncate is safe, but will delay the processing of the LeaderAndIsr request and it's probably too pessimistic. That's why I was thinking of running recovery on the last segment during startup if lastOffset < this.recoveryPoint. For the second part, the hole that you described in current 0.8 won't happen since we force a flush on log rolling.
        Hide
        Sriram Subramanian added a comment -

        1. Log.scala
        1.1 Doc fix. I see two of this - * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk

        2. ReplicaManager.scla
        2.1 How does this help? We could always crash before calling this and we should still do the right thing on recovery.

        3. TestLogPerformance
        3.1 Should we make the tool consistent like the rest (w.r.t args)?
        3.2 I am assuming we are testing with the default pdflush interval. Should we be able to control the flush interval to test performance?
        3.3 Could you add a comment on what this tool does and how is it different from the LinearWriteTest tool

        Show
        Sriram Subramanian added a comment - 1. Log.scala 1.1 Doc fix. I see two of this - * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk 2. ReplicaManager.scla 2.1 How does this help? We could always crash before calling this and we should still do the right thing on recovery. 3. TestLogPerformance 3.1 Should we make the tool consistent like the rest (w.r.t args)? 3.2 I am assuming we are testing with the default pdflush interval. Should we be able to control the flush interval to test performance? 3.3 Could you add a comment on what this tool does and how is it different from the LinearWriteTest tool
        Hide
        Jay Kreps added a comment -

        1.1 Good catch
        2.1 My desire is to ensure that checkpoints are flushed prior to taking new writes.
        3.1 TestLogPerformance is deleted in this patch. Previously TestLogPerformance tested the log and LinearWriteTest tested writing to files. But as I did the latency testing LinearWriteTest got better and better. Finally I just added the log testing into that tool. I deleted it as part of this patch because it was a subset of the functionality in LinearWriteTest.
        3.2 In all our tests we test with the filesystem, mount options, and configuration you have. I think trying to control OS-specific thing from the tool is overkill.

        Show
        Jay Kreps added a comment - 1.1 Good catch 2.1 My desire is to ensure that checkpoints are flushed prior to taking new writes. 3.1 TestLogPerformance is deleted in this patch. Previously TestLogPerformance tested the log and LinearWriteTest tested writing to files. But as I did the latency testing LinearWriteTest got better and better. Finally I just added the log testing into that tool. I deleted it as part of this patch because it was a subset of the functionality in LinearWriteTest. 3.2 In all our tests we test with the filesystem, mount options, and configuration you have. I think trying to control OS-specific thing from the tool is overkill.
        Hide
        Jay Kreps added a comment -

        Jun.
        52.2 In most filesystems there is no guarantee that metadata is flushed before/after/atomically with data. Ext3/4 has some guarantee with data=ordered, but this has other issues and we should not rely on it, and we don't want to require users run a particular mount option with their fs. What I am saying is that I think we cover that case. Casewise:
        1. If the truncation point is after than the recovery point, then the recovery point remains valid.
        2. If the truncation point is before the recovery point then the log is stable up to the truncation point. So in a crash either
        a. The metadata flush occurs and the log is correctly truncated, or else
        b. The metadata flush doesn't occur and we regain segments of log. This is just the same as if we hadn't truncated the log. The log contents remain stable.

        What I think is a problematic case is if
        a. We truncate to to offset T where T is less than the recovery point R.
        b. We take new writes at offset T, T+1, T+2
        c. Then we checkpoint the recovery point at T+2
        If this occurred then we have messages T, T+1, etc which have not been flushed but which are below the recovery point. The question I was asking is, can this happen? I don't fully understand how the fetching restarts in the leader change so I am not sure.

        Show
        Jay Kreps added a comment - Jun. 52.2 In most filesystems there is no guarantee that metadata is flushed before/after/atomically with data. Ext3/4 has some guarantee with data=ordered, but this has other issues and we should not rely on it, and we don't want to require users run a particular mount option with their fs. What I am saying is that I think we cover that case. Casewise: 1. If the truncation point is after than the recovery point, then the recovery point remains valid. 2. If the truncation point is before the recovery point then the log is stable up to the truncation point. So in a crash either a. The metadata flush occurs and the log is correctly truncated, or else b. The metadata flush doesn't occur and we regain segments of log. This is just the same as if we hadn't truncated the log. The log contents remain stable. What I think is a problematic case is if a. We truncate to to offset T where T is less than the recovery point R. b. We take new writes at offset T, T+1, T+2 c. Then we checkpoint the recovery point at T+2 If this occurred then we have messages T, T+1, etc which have not been flushed but which are below the recovery point. The question I was asking is, can this happen? I don't fully understand how the fetching restarts in the leader change so I am not sure.
        Hide
        Jay Kreps added a comment -

        Okay it looks like the fetch requests are only stopped in Partition.makeFollower, so though my reasoning may be right the assumption is wrong.

        Here is my proposal:
        1. I change this patch to checkpoint the recovery point in Partition.makeFollower. This will be inefficient since we will do this checkpoint once per partition that we become a slave for.
        2. I open a second JIRA to optimize the checkpoint logic.

        Here is the proposal:
        1. We add ReplicaManager.addFetchers and .removeFetchers which adds or removes a bunch of fetchers all at once
        2. We add LogManager.truncateTo(m: Map[TopicAndPartition, Long). This method will first checkpoint the recovery point, then do the truncates. This is better because it gets the recovery point stuff out of ReplicaManager.
        3. We change ReplicaManager.becomeLeaderOrFollower to stop fetchers for all slaves undergoing the change all at once, do the truncate, then start all those fetchers. This may be faster than what we currently have due to no longer having to fight for the fetcher lock many times while IO and network is happening.

        Show
        Jay Kreps added a comment - Okay it looks like the fetch requests are only stopped in Partition.makeFollower, so though my reasoning may be right the assumption is wrong. Here is my proposal: 1. I change this patch to checkpoint the recovery point in Partition.makeFollower. This will be inefficient since we will do this checkpoint once per partition that we become a slave for. 2. I open a second JIRA to optimize the checkpoint logic. Here is the proposal: 1. We add ReplicaManager.addFetchers and .removeFetchers which adds or removes a bunch of fetchers all at once 2. We add LogManager.truncateTo(m: Map[TopicAndPartition, Long). This method will first checkpoint the recovery point, then do the truncates. This is better because it gets the recovery point stuff out of ReplicaManager. 3. We change ReplicaManager.becomeLeaderOrFollower to stop fetchers for all slaves undergoing the change all at once, do the truncate, then start all those fetchers. This may be faster than what we currently have due to no longer having to fight for the fetcher lock many times while IO and network is happening.
        Hide
        Jay Kreps added a comment -

        Patch which uses the correct but slow approach of synchronously committing the checkpoint each time we truncate before fetching restarts.

        Show
        Jay Kreps added a comment - Patch which uses the correct but slow approach of synchronously committing the checkpoint each time we truncate before fetching restarts.
        Hide
        Sriram Subramanian added a comment -

        Do you not want to reset the recoveryPoint to the logEndOffset on startup? If logEndOffset is less than the recoveryPoint on startup, I think we could end up getting writes to the truncated offsets and we would not flush them. No?

        Show
        Sriram Subramanian added a comment - Do you not want to reset the recoveryPoint to the logEndOffset on startup? If logEndOffset is less than the recoveryPoint on startup, I think we could end up getting writes to the truncated offsets and we would not flush them. No?
        Hide
        Jay Kreps added a comment -

        Ack, yes, I did mean to fix the recoveryPoint/logEndOffset issue, I just forgot. Attached v8 which includes that. The fix is as you describe--I just reset the recovery point to the end of the log.

        Show
        Jay Kreps added a comment - Ack, yes, I did mean to fix the recoveryPoint/logEndOffset issue, I just forgot. Attached v8 which includes that. The fix is as you describe--I just reset the recovery point to the end of the log.
        Hide
        Sriram Subramanian added a comment -

        +1

        Show
        Sriram Subramanian added a comment - +1
        Hide
        Jay Kreps added a comment -

        Filed KAFKA-1001 to handle the follow-up work to optimize the checkpointing during failover.

        Show
        Jay Kreps added a comment - Filed KAFKA-1001 to handle the follow-up work to optimize the checkpointing during failover.
        Hide
        Jay Kreps added a comment -

        Also, Jun, basic system tests all pass with v8.

        Show
        Jay Kreps added a comment - Also, Jun, basic system tests all pass with v8.
        Hide
        Jun Rao added a comment -

        Thanks for patch v8. Just the following comment. Otherwise, +1.

        In 50.2, the warning message still refers to active segment.

        Show
        Jun Rao added a comment - Thanks for patch v8. Just the following comment. Otherwise, +1. In 50.2, the warning message still refers to active segment.
        Hide
        Jay Kreps added a comment -

        Ah, nice catch. Fixed the warning message and committed.

        Show
        Jay Kreps added a comment - Ah, nice catch. Fixed the warning message and committed.

          People

          • Assignee:
            Jay Kreps
            Reporter:
            Jay Kreps
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development