Kafka
  1. Kafka
  2. KAFKA-881

Kafka broker not respecting log.roll.hours

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.7.2
    • Fix Version/s: None
    • Component/s: log
    • Labels:
      None

      Description

      We are running Kafka 0.7.2. We set log.roll.hours=1. I hoped that meant logs would be rolled every hour, or more. Only, sometimes logs that are many hours (sometimes days) old have more data added to them. This perturbs our systems for reasons I won't get in to.

      I don't know Scala or Kafka well, but I have proposal for why this might happen: upon restart, a broker forgets when its log files have been appended to ("firstAppendTime"). Then a potentially infinite amount of time later, the restarted broker receives another message for the particular (topic, partition), and starts the clock again. It will then roll over that log after an hour.

      https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/server/KafkaConfig.scala says:

      /* the maximum time before a new log segment is rolled out */
      val logRollHours = Utils.getIntInRange(props, "log.roll.hours", 24*7, (1, Int.MaxValue))

      https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/log/Log.scala has maybeRoll, which needs segment.firstAppendTime defined. It also has updateFirstAppendTime() which says if it's empty, then set it.

      If my hypothesis is correct about why it is happening, here is a case where rolling is longer than an hour, even on a high volume topic:

      • write to a topic for 20 minutes
      • restart the broker
      • wait for 5 days
      • write to a topic for 20 minutes
      • restart the broker
      • write to a topic for an hour

      The rollover time was now 5 days, 1 hour, 40 minutes. You can make it as long as you want.

      Proposed solution:

      The very easiest thing to do would be to have Kafka re-initialized firstAppendTime with the file creation time. Unfortunately, there is no file creation time in UNIX. There is ctime, change time, updated when a file's inode information is changed.

      One solution is to embed the firstAppendTime in the filename (say, seconds since epoch). Then when you open it you could reset firstAppendTime to exactly what it really was. This ignores clock drift or resetting. One could set firstAppendTime to min(filename-based time, current time).

      A second solution is to make the Kafka log roll over at specific times, regardless of when the file was created. Conceptually, time can be divided into windows of size log.rollover.hours since epoch (UNIX time 0, 1970). So, when firstAppendTime is empty, compute the next rollover time (say, next = (hours since epoch) % (log.rollover.hours) + log.rollover.hours). If the file mtime (last modified) is before the current rollover window ( (next-log.rollover.hours) .. next ), roll it over right away. Otherwise, roll over when you cross next, and reset next.

      A third solution (not perfect, but an approximation at least) would be to not to write to a segment if firstAppendTime is not defined and the timestamp on the file is more than log.roll.hours old.

      There are probably other solutions.

      1. kafka_roll.patch
        6 kB
        Dan F
      2. kafka-roll.again.patch
        5 kB
        Dan F
      3. kafka-roll-0.8.patch
        21 kB
        Sam Meder

        Activity

        Hide
        Neha Narkhede added a comment -

        +1 on Jun's suggestion.

        Show
        Neha Narkhede added a comment - +1 on Jun's suggestion.
        Hide
        Jun Rao added a comment -

        How about the following: In log, maintaining an expectedRollingEpoc, which is computed by the time of the first append or the last modified time during restart (if the last segment is not empty) divided by rollIntervalMs. Once computed, expectedRollingEpoc doesn't change until the log rolls. We can then compare expectedRollingEpoc with current time divided by rollIntervalMs if determine if a log needs to be rolled.

        Show
        Jun Rao added a comment - How about the following: In log, maintaining an expectedRollingEpoc, which is computed by the time of the first append or the last modified time during restart (if the last segment is not empty) divided by rollIntervalMs. Once computed, expectedRollingEpoc doesn't change until the log rolls. We can then compare expectedRollingEpoc with current time divided by rollIntervalMs if determine if a log needs to be rolled.
        Hide
        Dan F added a comment -

        Sure enough, that's a race condition. Practically speaking, it never happens, but it's there.

        What do you suggest?

        Show
        Dan F added a comment - Sure enough, that's a race condition. Practically speaking, it never happens, but it's there. What do you suggest?
        Hide
        Jun Rao added a comment -

        Thanks for the patch. I am not sure if the patch works though. In log.append(), if the system time passes the rolling interval just after the maybeRoll() call, but before segment.append(). The will move the lastAppend time to the next rolling interval. So the next append won't trigger log rolling.

        Show
        Jun Rao added a comment - Thanks for the patch. I am not sure if the patch works though. In log.append(), if the system time passes the rolling interval just after the maybeRoll() call, but before segment.append(). The will move the lastAppend time to the next rolling interval. So the next append won't trigger log rolling.
        Hide
        Dan F added a comment -

        Any word on whether this will be accepted into 0.7 and 0.8? It would be comforting to know if we want to upgrade to 0.8 we don't have to re-patch it ourselves so as to not break our systems.

        Show
        Dan F added a comment - Any word on whether this will be accepted into 0.7 and 0.8? It would be comforting to know if we want to upgrade to 0.8 we don't have to re-patch it ourselves so as to not break our systems.
        Hide
        Dan F added a comment -

        1. No, I believe that is the bug. lastAppendTime does not persist across restarts.

        2. It might be better for you to do it, since you know which whitespace changes are disturbing you. As far as I know, I made minimal changes that obey the whitespace rules (i.e. closing braces matching open braces in indentation, etc.). I might be wrong.

        Show
        Dan F added a comment - 1. No, I believe that is the bug. lastAppendTime does not persist across restarts. 2. It might be better for you to do it, since you know which whitespace changes are disturbing you. As far as I know, I made minimal changes that obey the whitespace rules (i.e. closing braces matching open braces in indentation, etc.). I might be wrong.
        Hide
        Neha Narkhede added a comment -

        Sorry for reviewing this late. Few questions -

        1. Doesn't it suffice to check if(segment.lastAppendTime.isDefined && (time.milliseconds - segment.lastAppendTime.get > rollIntervalMs))
        2. There are many whitespace changes in LogSegment. Do you mind getting rid of those in the next patch ?

        Show
        Neha Narkhede added a comment - Sorry for reviewing this late. Few questions - 1. Doesn't it suffice to check if(segment.lastAppendTime.isDefined && (time.milliseconds - segment.lastAppendTime.get > rollIntervalMs)) 2. There are many whitespace changes in LogSegment. Do you mind getting rid of those in the next patch ?
        Hide
        Dan F added a comment -

        Sam: Thanks for that. It would be comforting to know if/when we decide to upgrade to 0.8 that our log rollover still works the way we need it to.

        Show
        Dan F added a comment - Sam: Thanks for that. It would be comforting to know if/when we decide to upgrade to 0.8 that our log rollover still works the way we need it to.
        Hide
        Sam Meder added a comment -

        Attached a version of this patch that applies against 0.8

        Show
        Sam Meder added a comment - Attached a version of this patch that applies against 0.8
        Hide
        Dan F added a comment -

        I am not necessarily opposed, but I don't understand what "crossed" means. It sounds to me like what is being tested: "((time.milliseconds / rollIntervalMs) != (segment.lastAppendTime.get / rollIntervalMs))".

        I'd be happier with the change as is just because we've already rolled it into production. However, I understand you want the right change for you.

        Show
        Dan F added a comment - I am not necessarily opposed, but I don't understand what "crossed" means. It sounds to me like what is being tested: "((time.milliseconds / rollIntervalMs) != (segment.lastAppendTime.get / rollIntervalMs))". I'd be happier with the change as is just because we've already rolled it into production. However, I understand you want the right change for you.
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Would it be simpler to just test that the file is not empty and the time boundary is crossed?

        Show
        Jun Rao added a comment - Thanks for the patch. Would it be simpler to just test that the file is not empty and the time boundary is crossed?
        Hide
        Dan F added a comment - - edited

        Sure. I did the below, then submitted kafka-roll.again.patch.

        ====================================================

        git clone https://git-wip-us.apache.org/repos/asf/kafka.git official-kafka
        cd official-kafka/
        git co kafka-0.7.2-incubating-candidate-5
        git co -b dan_kafka_881
        patch -p1 < kafka-roll.patch
        git config --global user.name ...
        git config --global user.email ...
        git commit -a -m "Change log to roll when time / rollIntervalMs changes."
        git format-patch kafka-0.7.2-incubating-candidate-5 --stdout > kafka-roll.again.patch
        ./sbt
        update
        test

        ==================================================

        Show
        Dan F added a comment - - edited Sure. I did the below, then submitted kafka-roll.again.patch. ==================================================== git clone https://git-wip-us.apache.org/repos/asf/kafka.git official-kafka cd official-kafka/ git co kafka-0.7.2-incubating-candidate-5 git co -b dan_kafka_881 patch -p1 < kafka-roll.patch git config --global user.name ... git config --global user.email ... git commit -a -m "Change log to roll when time / rollIntervalMs changes." git format-patch kafka-0.7.2-incubating-candidate-5 --stdout > kafka-roll.again.patch ./sbt update test ==================================================
        Hide
        Jun Rao added a comment -

        Sorry for the delay. I can't apply the patch to the 0.7 branch. Could you follow https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow and submit a new patch?

        Show
        Jun Rao added a comment - Sorry for the delay. I can't apply the patch to the 0.7 branch. Could you follow https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow and submit a new patch?
        Hide
        Dan F added a comment -

        We are going to roll this out. Is anyone willing to look at the patch any time soon? It'd be nice to increase our confidence it is the right thing, and that it will show up in future versions.

        Show
        Dan F added a comment - We are going to roll this out. Is anyone willing to look at the patch any time soon? It'd be nice to increase our confidence it is the right thing, and that it will show up in future versions.
        Hide
        Dan F added a comment -

        Testing on my own server looks pretty good:

        • start server
        • send messages
        • stop server
        • wait for hour to roll over (e.g., 4:59pm to 5:01pm)
        • start server
        • send new message
        • a new file appears
        Show
        Dan F added a comment - Testing on my own server looks pretty good: start server send messages stop server wait for hour to roll over (e.g., 4:59pm to 5:01pm) start server send new message a new file appears
        Hide
        Dan F added a comment -

        I attach a patch to 0.7.2. It passes the unit tests. I added a unit test for a 2nd log picking up where the first left off.

        I will now start testing my own server, but I'd appreciate a code review, and some idea of whether we can check this into 0.7.2, or whether we'll have to apply it on our own.

        Thanks.

        Show
        Dan F added a comment - I attach a patch to 0.7.2. It passes the unit tests. I added a unit test for a 2nd log picking up where the first left off. I will now start testing my own server, but I'd appreciate a code review, and some idea of whether we can check this into 0.7.2, or whether we'll have to apply it on our own. Thanks.
        Hide
        Dan F added a comment -

        There would be major issues for us switching versions. I am going to try to produce a patch on 0.7.2 and I hope you'll take a look.

        Show
        Dan F added a comment - There would be major issues for us switching versions. I am going to try to produce a patch on 0.7.2 and I hope you'll take a look.
        Hide
        Jun Rao added a comment -

        If we "don't reuse files after a restart", it may cause fragmentation of the log files since it will affect all logs, whether their retention is time-based (and how long the retention time is) or not. The second option seems better. Since the major development is on 0.8 now, I suggest that we patch this in trunk, instead of 0.7.

        Show
        Jun Rao added a comment - If we "don't reuse files after a restart", it may cause fragmentation of the log files since it will affect all logs, whether their retention is time-based (and how long the retention time is) or not. The second option seems better. Since the major development is on 0.8 now, I suggest that we patch this in trunk, instead of 0.7.
        Hide
        Dan F added a comment -

        Someone pointed out a particularly easy fix: don't reuse files after a restart. Done. I really like that. Simple. Any chance of this happening any time soon?

        Show
        Dan F added a comment - Someone pointed out a particularly easy fix: don't reuse files after a restart. Done. I really like that. Simple. Any chance of this happening any time soon?
        Hide
        Dan F added a comment -

        Note: it would be useful for us to have the files roll over at the top of each hour. Then at 12:01am we know a file at 12:00am will not be written to. Thus, solution #2 is more attractive to us.

        Show
        Dan F added a comment - Note: it would be useful for us to have the files roll over at the top of each hour. Then at 12:01am we know a file at 12:00am will not be written to. Thus, solution #2 is more attractive to us.

          People

          • Assignee:
            Jay Kreps
            Reporter:
            Dan F
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:

              Development