Kafka
  1. Kafka
  2. KAFKA-802

Flush message interval is based on compressed message count

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: core
    • Labels:
      None

      Description

      In Log.append(), we use compressed message count to determine whether to flush the log or not. We should use uncompressed message count instead.

      1. kafka-802-v3.patch
        3 kB
        Neha Narkhede
      2. kafka-802-v2.patch
        2 kB
        Neha Narkhede
      3. kafka-802.patch
        2 kB
        Neha Narkhede

        Activity

        Neha Narkhede made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        Neha Narkhede made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch v3. +1.

        Show
        Jun Rao added a comment - Thanks for patch v3. +1.
        Neha Narkhede made changes -
        Attachment kafka-802-v3.patch [ 12573544 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch v2. We need a few tweaks.

        20. The assignment to firstOffset and the computation of numAppendedMessages have to be done inside the synchronization of the lock.

        21. The following statements in append are no longer valid and can be removed.
        // return the offset at which the messages were appended
        offsets

        Show
        Jun Rao added a comment - Thanks for patch v2. We need a few tweaks. 20. The assignment to firstOffset and the computation of numAppendedMessages have to be done inside the synchronization of the lock. 21. The following statements in append are no longer valid and can be removed. // return the offset at which the messages were appended offsets
        Neha Narkhede made changes -
        Attachment kafka-802-v2.patch [ 12573538 ]
        Hide
        Neha Narkhede added a comment -

        That is a good insight, Jun. Included that fix in v2

        Show
        Neha Narkhede added a comment - That is a good insight, Jun. Included that fix in v2
        Hide
        Jun Rao added a comment - - edited

        Sriram, agreed that it's better if the meaning for flush interval is consistent btw the leader and the follower. After thinking a bit more, I think this is doable. Basically, during shallow iteration, the offset returned for the compressed message is the offset of the last uncompressed message in it. So, the last offset is actually correct even if messages are compressed. The problem is with the first offset. I think we can set first offset to nextOffset at the beginning of append(). This way, whether we need to assign offsets or not, we can alway obtain the right first offset. We can then use different between last and first offset to drive log flush.

        Show
        Jun Rao added a comment - - edited Sriram, agreed that it's better if the meaning for flush interval is consistent btw the leader and the follower. After thinking a bit more, I think this is doable. Basically, during shallow iteration, the offset returned for the compressed message is the offset of the last uncompressed message in it. So, the last offset is actually correct even if messages are compressed. The problem is with the first offset. I think we can set first offset to nextOffset at the beginning of append(). This way, whether we need to assign offsets or not, we can alway obtain the right first offset. We can then use different between last and first offset to drive log flush.
        Hide
        Sriram Subramanian added a comment -

        Personally, having different meaning to flushed messages (compressed for follower Vs uncompressed for leader) will make it tough to reason out things.

        Show
        Sriram Subramanian added a comment - Personally, having different meaning to flushed messages (compressed for follower Vs uncompressed for leader) will make it tough to reason out things.
        Neha Narkhede made changes -
        Assignee Neha Narkhede [ nehanarkhede ]
        Neha Narkhede made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Neha Narkhede made changes -
        Field Original Value New Value
        Attachment kafka-802.patch [ 12573530 ]
        Hide
        Neha Narkhede added a comment -

        Two changes -

        1. Changed the maybeFlush() call from Log.append() to take in number of appended messages. On the leader, this will be the uncompressed message count, but on the follower, this will still be the compressed message count since we don't recompress data on the follower. I think this is fine.

        2. Changed the default of log.flush.interval.messages to 10000 instead of 500. With replication, a very low default for this config doesn't make sense

        Show
        Neha Narkhede added a comment - Two changes - 1. Changed the maybeFlush() call from Log.append() to take in number of appended messages. On the leader, this will be the uncompressed message count, but on the follower, this will still be the compressed message count since we don't recompress data on the follower. I think this is fine. 2. Changed the default of log.flush.interval.messages to 10000 instead of 500. With replication, a very low default for this config doesn't make sense
        Hide
        Jun Rao added a comment -

        This is easy to fix on the leader since it iterates uncompressed messages to assign offsets. It's a bit hard to fix in the follower. We could iterate uncompressed messages. This will add CPU overhead though.

        Show
        Jun Rao added a comment - This is easy to fix on the leader since it iterates uncompressed messages to assign offsets. It's a bit hard to fix in the follower. We could iterate uncompressed messages. This will add CPU overhead though.
        Jun Rao created issue -

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Jun Rao
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development