Kafka
  1. Kafka
  2. KAFKA-77

Implement "group commit" for kafka logs

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Won't Fix
    • Affects Version/s: 0.7
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None

      Description

      The most expensive operation for the server is usually going to be the fsync() call to sync data in a log to disk, if you don't flush your data is at greater risk of being lost in a crash. Currently we give two knobs to tune this trade--log.flush.interval and log.default.flush.interval.ms (no idea why one has default and the other doesn't since they are both defaults). However if you flush frequently, say on every write, then performance is not that great.

      One trick that can be used to improve this worst case of continual flushes is to allow a single fsync() to be used for multiple writes that occur at the same time. This is a lot like "group commit" in databases. It is unclear which cases this would improve and by how much but it might be worth a try.

        Activity

        Hide
        Jun Rao added a comment -

        I think what Jay meant is that in 0.8, a message is considered as committed as long as it's written in memory in f brokers (f being the replication factor). This is probably as good or better than forcing data to disk, assuming failures are rare. Therefore, flushing to disk does not need to be optimized for durability guarantees.

        Show
        Jun Rao added a comment - I think what Jay meant is that in 0.8, a message is considered as committed as long as it's written in memory in f brokers (f being the replication factor). This is probably as good or better than forcing data to disk, assuming failures are rare. Therefore, flushing to disk does not need to be optimized for durability guarantees.
        Hide
        Dave Revell added a comment -

        > This is not really a good idea post 0.8 as we no longer have much dependence on the disk flush.

        Jay, would you mind explaining a bit more? Is there a new feature in Kafka >0.8 that improves durability without the the needs for disk flushes? Or is there perhaps a new feature that decreases the performance penalty of flushing after every message?

        Show
        Dave Revell added a comment - > This is not really a good idea post 0.8 as we no longer have much dependence on the disk flush. Jay, would you mind explaining a bit more? Is there a new feature in Kafka >0.8 that improves durability without the the needs for disk flushes? Or is there perhaps a new feature that decreases the performance penalty of flushing after every message?
        Jay Kreps made changes -
        Status Open [ 1 ] Resolved [ 5 ]
        Resolution Won't Fix [ 2 ]
        Hide
        Jay Kreps added a comment -

        This is not really a good idea post 0.8 as we no longer have much dependence on the disk flush.

        Show
        Jay Kreps added a comment - This is not really a good idea post 0.8 as we no longer have much dependence on the disk flush.
        Hide
        Neha Narkhede added a comment -

        I tried running the new producer perf tests on this patch, measuring the producer throughput with and without this patch. Here are the findings -

        group.commit, MB/sec, messages/sec, broker.num.partitions, broker.flush.interval, broker.num.threads, num.producer.threads
        yes, 1.8903, 9910.4099, 1, 1, 8, 8
        no, 1.1037, 5786.3673, 1, 1, 8, 8
        yes, 6.0624, 31784.5769, 1, 1000, 8, 8
        no, 4.9943, 26184.3166, 1, 1000, 8, 8

        varying the number of threads on the server (affects the number of writes that can be batched)

        group.commit, MB/sec, messages/sec, broker.num.partitions, broker.flush.interval, broker.num.threads, num.producer.threads

        yes, 2.0313, 10649.6273, 1, 1, 32, 8
        no, 1.1499, 6028.5997, 1, 1, 32, 8
        yes, 6.2151, 32584.9653, 1, 1000, 32, 8
        no, 4.8507, 25431.4445, 1, 1000, 32, 8

        To summarize, it shows at least 16% improvement (with flush interval 1000 and 8 server threads) and 38% improvement (with flush interval 1 and 8 server threads)

        Show
        Neha Narkhede added a comment - I tried running the new producer perf tests on this patch, measuring the producer throughput with and without this patch. Here are the findings - group.commit, MB/sec, messages/sec, broker.num.partitions, broker.flush.interval, broker.num.threads, num.producer.threads yes, 1.8903, 9910.4099, 1, 1, 8, 8 no, 1.1037, 5786.3673, 1, 1, 8, 8 yes, 6.0624, 31784.5769, 1, 1000, 8, 8 no, 4.9943, 26184.3166, 1, 1000, 8, 8 varying the number of threads on the server (affects the number of writes that can be batched) group.commit, MB/sec, messages/sec, broker.num.partitions, broker.flush.interval, broker.num.threads, num.producer.threads yes, 2.0313, 10649.6273, 1, 1, 32, 8 no, 1.1499, 6028.5997, 1, 1, 32, 8 yes, 6.2151, 32584.9653, 1, 1000, 32, 8 no, 4.8507, 25431.4445, 1, 1000, 32, 8 To summarize, it shows at least 16% improvement (with flush interval 1000 and 8 server threads) and 38% improvement (with flush interval 1 and 8 server threads)
        Alan Cabrera made changes -
        Workflow jira [ 12624060 ] no-reopen-closed, patch-avail [ 12626219 ]
        Hide
        Jay Kreps added a comment -

        Yeah, fwiw, I consider this a proof of concept to understand the perf impact.

        Chris, yes, flush == FileChannel.force == fsync.

        To clarify the performance case I am going after is not the case where you have tuned back sync() to a reasonable level, this case we already handle optimally i think. The case I was targeting was the case where you need to sync on every write for durability (or to reduce consumer latency). In this case I suspect we could batch 30-50% of the flushes, which could help a lot. Since this is a performance optimization I agree it is only maybe worth it if the performance is quite good in the target case and not worse elsewhere. If not then we will at least have explored the possibility.

        One thing that would greatly help this kind of thing would be to have a sort of canonical performance suite to run against, but making that is more work then this patch...

        I agree that it could be cleaner to have a separate I/O thread pool that handled all writes for each Log in a single threaded manner off its own write queue. If the performance pans out i will consider this.

        Show
        Jay Kreps added a comment - Yeah, fwiw, I consider this a proof of concept to understand the perf impact. Chris, yes, flush == FileChannel.force == fsync. To clarify the performance case I am going after is not the case where you have tuned back sync() to a reasonable level, this case we already handle optimally i think. The case I was targeting was the case where you need to sync on every write for durability (or to reduce consumer latency). In this case I suspect we could batch 30-50% of the flushes, which could help a lot. Since this is a performance optimization I agree it is only maybe worth it if the performance is quite good in the target case and not worse elsewhere. If not then we will at least have explored the possibility. One thing that would greatly help this kind of thing would be to have a sort of canonical performance suite to run against, but making that is more work then this patch... I agree that it could be cleaner to have a separate I/O thread pool that handled all writes for each Log in a single threaded manner off its own write queue. If the performance pans out i will consider this.
        Hide
        Jun Rao added a comment -

        Chris, I am not if if we gain much by limiting the # of fsync. In a typically scenario in Kafka, most reads are served from pagecache. So the real I/O load to the underlying storage system is fsync. If at a given point of time, there is a pending write and there is no ongoing fsync, we are not fully utilizing the available resource of the storage system. I think a more effective way is to keep flushing in a separate thread. If there are multiple additional writes accumulated during one flush, the next flush will fsync more data to the storage media in a single call, essentially getting the benefit of group commit. If there is only 1 more write accumulated, syncing it immediately doesn't hurt since otherwise the storage system will likely be idle.

        Show
        Jun Rao added a comment - Chris, I am not if if we gain much by limiting the # of fsync. In a typically scenario in Kafka, most reads are served from pagecache. So the real I/O load to the underlying storage system is fsync. If at a given point of time, there is a pending write and there is no ongoing fsync, we are not fully utilizing the available resource of the storage system. I think a more effective way is to keep flushing in a separate thread. If there are multiple additional writes accumulated during one flush, the next flush will fsync more data to the storage media in a single call, essentially getting the benefit of group commit. If there is only 1 more write accumulated, syncing it immediately doesn't hurt since otherwise the storage system will likely be idle.
        Hide
        Chris Burroughs added a comment -

        My preference would be to implement a separate flush thread that constantly obtains dirty file segments from a blocking queue and flushes each of them as fast as possible.

        flush == FileChannel.force == fsync, right? Isn't the the point to limit fsync to a reasonable (not too many per scond) rate, not to issue them as fast as possible?

        Show
        Chris Burroughs added a comment - My preference would be to implement a separate flush thread that constantly obtains dirty file segments from a blocking queue and flushes each of them as fast as possible. flush == FileChannel.force == fsync, right? Isn't the the point to limit fsync to a reasonable (not too many per scond) rate, not to issue them as fast as possible?
        Hide
        Jun Rao added a comment -

        Thanks for the patch. The logic in the patch looks correct. I don't know how much benefit we can gain from this. Will wait for the performance number.

        My main concern with this patch is that it doesn't remove either of the two existing log flush configs and potentially adds a third one, MaxGroupCommitSize. This seems to complicate the configs further.

        My preference would be to implement a separate flush thread that constantly obtains dirty file segments from a blocking queue and flushes each of them as fast as possible. We can replace the two existing flush configs with a new one that controls the queue size (i.e., # of outstanding flushes).

        Show
        Jun Rao added a comment - Thanks for the patch. The logic in the patch looks correct. I don't know how much benefit we can gain from this. Will wait for the performance number. My main concern with this patch is that it doesn't remove either of the two existing log flush configs and potentially adds a third one, MaxGroupCommitSize. This seems to complicate the configs further. My preference would be to implement a separate flush thread that constantly obtains dirty file segments from a blocking queue and flushes each of them as fast as possible. We can replace the two existing flush configs with a new one that controls the queue size (i.e., # of outstanding flushes).
        Jay Kreps made changes -
        Field Original Value New Value
        Attachment kafka-group-commit.patch [ 12488315 ]
        Hide
        Jay Kreps added a comment -

        A patch that implements group commit for kafka. This implementation is a little complex, the append() method is now a little scary, maybe someone sees a way to do it more simply.

        A couple of notes:

        1. I don't use any separate threads, the actual write is done by one of the writing threads involved in the commit (essentially it is a race, whomever gets there first does it).

        2. I only try to batch the flush, I don't try to batch the write() call though. Batching writes could be done as well but it would require either working around the MessageSet.writeTo interface since you now want to write multiple message sets at once in a single call, which breaks the current abstraction. Also the write call gives time for more writes to accumulate in the group so that might not help.

        3. I try to limit the group size to some fixed upper limit (50) which I just hard code. In practice I could not produce groups of more than 3, but I want to guarantee that you can't block the commit forever by queuing up writes under high load.

        This whole idea is really only worth it if there are non-pathological cases where performance gets significantly better and performance doesn't get worse anywhere else.

        I haven't really done any performance testing yet as my laptop seems to get CPU bound by the producer perf test process which means I am having trouble producing an I/O-bound load on one machine. I think I need to run an experiment with more than one producer machine which is separate from the kafka machine and perhaps with more than one topic to force seeks when we do flushes (sequentially flushes should be much cheaper but that would only happen if you had one topic). I will update this bug when i have some real benchmarking.

        Show
        Jay Kreps added a comment - A patch that implements group commit for kafka. This implementation is a little complex, the append() method is now a little scary, maybe someone sees a way to do it more simply. A couple of notes: 1. I don't use any separate threads, the actual write is done by one of the writing threads involved in the commit (essentially it is a race, whomever gets there first does it). 2. I only try to batch the flush, I don't try to batch the write() call though. Batching writes could be done as well but it would require either working around the MessageSet.writeTo interface since you now want to write multiple message sets at once in a single call, which breaks the current abstraction. Also the write call gives time for more writes to accumulate in the group so that might not help. 3. I try to limit the group size to some fixed upper limit (50) which I just hard code. In practice I could not produce groups of more than 3, but I want to guarantee that you can't block the commit forever by queuing up writes under high load. This whole idea is really only worth it if there are non-pathological cases where performance gets significantly better and performance doesn't get worse anywhere else. I haven't really done any performance testing yet as my laptop seems to get CPU bound by the producer perf test process which means I am having trouble producing an I/O-bound load on one machine. I think I need to run an experiment with more than one producer machine which is separate from the kafka machine and perhaps with more than one topic to force seeks when we do flushes (sequentially flushes should be much cheaper but that would only happen if you had one topic). I will update this bug when i have some real benchmarking.
        Jay Kreps created issue -

          People

          • Assignee:
            Jay Kreps
            Reporter:
            Jay Kreps
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development