Kafka
  1. Kafka
  2. KAFKA-188

Support multiple data directories

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None

      Description

      Currently we allow only a single data directory. This means that a multi-disk configuration needs to be a RAID array or LVM volume or something like that to be mounted as a single directory.

      For a high-throughput low-reliability configuration this would mean RAID0 striping. Common wisdom in Hadoop land has it that a JBOD setup that just mounts each disk as a separate directory and does application-level balancing over these results in about 30% write-improvement. For example see this claim here:
      http://old.nabble.com/Re%3A-RAID-vs.-JBOD-p21466110.html

      It is not clear to me why this would be the case--it seems the RAID controller should be able to balance writes as well as the application so it may depend on the details of the setup.

      Nonetheless this would be really easy to implement, all you need to do is add multiple data directories and balance partition creation over these disks.

      One problem this might cause is if a particular topic is much larger than the others it might unbalance the load across the disks. The partition->disk assignment policy should probably attempt to evenly spread each topic to avoid this, rather than just trying keep the number of partitions balanced between disks.

      1. KAFKA-188.patch
        22 kB
        Jay Kreps
      2. KAFKA-188-v2.patch
        42 kB
        Jay Kreps
      3. KAFKA-188-v3.patch
        42 kB
        Jay Kreps
      4. KAFKA-188-v4.patch
        42 kB
        Jay Kreps
      5. KAFKA-188-v5.patch
        49 kB
        Jay Kreps
      6. KAFKA-188-v6.patch
        49 kB
        Jay Kreps
      7. KAFKA-188-v7.patch
        50 kB
        Jay Kreps
      8. KAFKA-188-v8.patch
        55 kB
        Jay Kreps

        Issue Links

          Activity

          Hide
          Jay Kreps added a comment -

          Here are a few additional thoughts on this:
          1. This is actually a lot more valuable after kafka 0.8 is out since we will already allow replication at a higher level so the raid is less desirable. Patch should definitely be on 0.8 branch, though it will likely be the same for 0.7.
          2. It is worth deciding if we want to support unbalanced disk sizes and speeds. E.g. if you have a 7.2k RPM drive and a 10k rpm drive will we allow you to balance over these? I recommend we skip this for now, we can always do it later. So like with RAID, we will treat all drives equally.
          3. I think the only change will be in LogManager. Instead of a single config.logDir parameter we will need logDirs, an array of directories. In createLog() we will need a policy that chooses the best disk on which to place the new log-partition.
          4. There may be a few other places that assume a single log directory, may have to grep around and check for that. I don't think their is to much else, as everything else should interact through LogManager and once the Log instance is created it doesn't care where it's home directory is.

          One approach to placement would be to always create new logs on the "least loaded" directory. The definition of "least loaded" is likely to be a heuristic. There are two things we want to balance (1) data size, (2) i/o throughput. If the retention policy is based on time, then size is a good proxy for throughput. However you could imagine having one log with very small retention size but very high throughput. Another problem is that the usage may change over time, and migration is not feasable. For example a new feature going through a ramped rollout might produce almost no data at first and then later produce gobs of data. Furthermore you might get odd results in the case where you manually pre-create many topics all at once as they would all end up on whichever directory had the least data.

          I think a better strategy would be to not try to estimate the least-loaded partition and instead just do round-robin assignment (e.g. logDirs(counter.getAndIncrement() % logDirs.length)). The assumption would be that the number of partitions is large enough that each topic has one partition on each disk.

          Either of these strategies has a corner case if all data goes to a single topic (or one topic dominates the load distribution), and that topic has (say) 5 local partitions and 4 data directories. In this case one directory will get 2x the others. However this corner case could be worked around by carefully aligning the partition count and the total number of data directories, so I don't think we need to handle it here.

          Show
          Jay Kreps added a comment - Here are a few additional thoughts on this: 1. This is actually a lot more valuable after kafka 0.8 is out since we will already allow replication at a higher level so the raid is less desirable. Patch should definitely be on 0.8 branch, though it will likely be the same for 0.7. 2. It is worth deciding if we want to support unbalanced disk sizes and speeds. E.g. if you have a 7.2k RPM drive and a 10k rpm drive will we allow you to balance over these? I recommend we skip this for now, we can always do it later. So like with RAID, we will treat all drives equally. 3. I think the only change will be in LogManager. Instead of a single config.logDir parameter we will need logDirs, an array of directories. In createLog() we will need a policy that chooses the best disk on which to place the new log-partition. 4. There may be a few other places that assume a single log directory, may have to grep around and check for that. I don't think their is to much else, as everything else should interact through LogManager and once the Log instance is created it doesn't care where it's home directory is. One approach to placement would be to always create new logs on the "least loaded" directory. The definition of "least loaded" is likely to be a heuristic. There are two things we want to balance (1) data size, (2) i/o throughput. If the retention policy is based on time, then size is a good proxy for throughput. However you could imagine having one log with very small retention size but very high throughput. Another problem is that the usage may change over time, and migration is not feasable. For example a new feature going through a ramped rollout might produce almost no data at first and then later produce gobs of data. Furthermore you might get odd results in the case where you manually pre-create many topics all at once as they would all end up on whichever directory had the least data. I think a better strategy would be to not try to estimate the least-loaded partition and instead just do round-robin assignment (e.g. logDirs(counter.getAndIncrement() % logDirs.length)). The assumption would be that the number of partitions is large enough that each topic has one partition on each disk. Either of these strategies has a corner case if all data goes to a single topic (or one topic dominates the load distribution), and that topic has (say) 5 local partitions and 4 data directories. In this case one directory will get 2x the others. However this corner case could be worked around by carefully aligning the partition count and the total number of data directories, so I don't think we need to handle it here.
          Hide
          Jonathan Creasy added a comment - - edited

          I started the implementation and my code looks much like you have described. I am now to the point of determining which data location to use. I am planning on doing a round-robin assignment for each partition.

          So, with 4 data dirs and the following topic/partition scheme:

          topic1 - 2 partitions
          topic2 - 4 partitions
          topic3 - 1 partition
          topic4 - 2 partitions

          disk1 = topic1/1, topic2/3, topic4/2
          disc2 = topic1/2, topic2/4
          disc3 = topic2/1, topic3/1
          disc4 = topic2/2, topic4/1

          This is a good first step, we may want to later add re-balancing code based on metrics so that the "produced/consumed messages per second" are roughly balanced per disk. This may or may not be feasible and valuable and isn't really that important in this initial implementation.

          Show
          Jonathan Creasy added a comment - - edited I started the implementation and my code looks much like you have described. I am now to the point of determining which data location to use. I am planning on doing a round-robin assignment for each partition. So, with 4 data dirs and the following topic/partition scheme: topic1 - 2 partitions topic2 - 4 partitions topic3 - 1 partition topic4 - 2 partitions disk1 = topic1/1, topic2/3, topic4/2 disc2 = topic1/2, topic2/4 disc3 = topic2/1, topic3/1 disc4 = topic2/2, topic4/1 This is a good first step, we may want to later add re-balancing code based on metrics so that the "produced/consumed messages per second" are roughly balanced per disk. This may or may not be feasible and valuable and isn't really that important in this initial implementation.
          Hide
          Jonathan Creasy added a comment -

          Sorry, I meant to have this patch in sooner but got quite busy, I expect to have it soon.

          Show
          Jonathan Creasy added a comment - Sorry, I meant to have this patch in sooner but got quite busy, I expect to have it soon.
          Hide
          Jay Kreps added a comment -

          Hey Jonathan, I am working on some log stuff, mind if I take a shot at this too while I am in there?

          Show
          Jay Kreps added a comment - Hey Jonathan, I am working on some log stuff, mind if I take a shot at this too while I am in there?
          Hide
          Jonathan Creasy added a comment - - edited

          Yes please, I have the code written, and I even tested it a little, but I've been able to spend no time on it in a few weeks. It's probably not done yet, but it's close.

          Show
          Jonathan Creasy added a comment - - edited Yes please, I have the code written, and I even tested it a little, but I've been able to spend no time on it in a few weeks. It's probably not done yet, but it's close.
          Hide
          Jay Kreps added a comment -

          Do you want to just upload whatever you've got in whatever state its in and I can use that as a starting point?

          Show
          Jay Kreps added a comment - Do you want to just upload whatever you've got in whatever state its in and I can use that as a starting point?
          Hide
          Jay Kreps added a comment -

          Okay, took a shot at this. Attached is a draft patch. It needs some tests specific to the log selection.

          There is a hitch, which I discuss below.

          The basic change is
          1. Change KafkaConfig.logDir to logDirectories and take a CSV of paths (I still support the old setting as a fallback)
          2. Move the whole clean shutdown marker file thing into the LogManager. I feel it should have always been inside LogManager since it is an implementation detail of log recovery. Now there is one such file per log directory.
          3. Create new logs in the directories in a round-robin fashion. To do this I keep a counter in LogManager that controls the index of the dir in which we will next create a new log. Initialize this to a random dir index. Each time we create a new index use the dir this points at and then increment it.

          The hitch is the high watermark file. Currently we keep it in log.dir. But what directory should we keep it in when there are multiple log directories? I hackily just use the first. However this creates a dependency on the order of the log dirs in the config, which is not ideal. If we sort them and use the dir that is alphabetically first then if we add new directories that will mess it up (whereas if we hadn't sorted we could have tacked them on the end).

          Some options:
          0. What I currently do, just use the first directory for the hwmark file.
          1. Add a new configuration property, metadataDir and use this for the highwatermark file and the clean shutdown marker and any future persistent thing. Downside of this is that it requires a new config parameter the user has to set up.
          2. Require a top level directory for all log directories. e.g.
          all_logs/
          log_dir_1/
          my_topic-1/
          my_topic-2
          log_dir_2/
          ...
          Obviously since the goal is to support multiple independently mounted disks and you might not control where they mount to you might have to use soft links. From a previous life I am remembering lots of pain related to java and softlinks.

          I would like to get people's feedback on this.

          Show
          Jay Kreps added a comment - Okay, took a shot at this. Attached is a draft patch. It needs some tests specific to the log selection. There is a hitch, which I discuss below. The basic change is 1. Change KafkaConfig.logDir to logDirectories and take a CSV of paths (I still support the old setting as a fallback) 2. Move the whole clean shutdown marker file thing into the LogManager. I feel it should have always been inside LogManager since it is an implementation detail of log recovery. Now there is one such file per log directory. 3. Create new logs in the directories in a round-robin fashion. To do this I keep a counter in LogManager that controls the index of the dir in which we will next create a new log. Initialize this to a random dir index. Each time we create a new index use the dir this points at and then increment it. The hitch is the high watermark file. Currently we keep it in log.dir. But what directory should we keep it in when there are multiple log directories? I hackily just use the first. However this creates a dependency on the order of the log dirs in the config, which is not ideal. If we sort them and use the dir that is alphabetically first then if we add new directories that will mess it up (whereas if we hadn't sorted we could have tacked them on the end). Some options: 0. What I currently do, just use the first directory for the hwmark file. 1. Add a new configuration property, metadataDir and use this for the highwatermark file and the clean shutdown marker and any future persistent thing. Downside of this is that it requires a new config parameter the user has to set up. 2. Require a top level directory for all log directories. e.g. all_logs/ log_dir_1/ my_topic-1/ my_topic-2 log_dir_2/ ... Obviously since the goal is to support multiple independently mounted disks and you might not control where they mount to you might have to use soft links. From a previous life I am remembering lots of pain related to java and softlinks. I would like to get people's feedback on this.
          Hide
          Jun Rao added a comment -

          Thanks for the patch. A couple of comments:

          1. It's probably better to have a separate high watermark file per dir for partitions assigned to it. That way, if a disk is damaged, we only lose the high watermarks and the data for partitions on that disk.

          2. About data balancing, in the normal case, assigning partitions in a round-robin way to log dirs is fine. However, if a disk is damaged and is replaced, initially there is no data on it. The round-robin approach would mean that the newly replaced disk will always have fewer partitions that other disks. The same issue can occur if a new dir is added in the configuration. An alternative approach is to assign a new partition to the dir with the fewest partitions, which would alleviate this issue.

          Show
          Jun Rao added a comment - Thanks for the patch. A couple of comments: 1. It's probably better to have a separate high watermark file per dir for partitions assigned to it. That way, if a disk is damaged, we only lose the high watermarks and the data for partitions on that disk. 2. About data balancing, in the normal case, assigning partitions in a round-robin way to log dirs is fine. However, if a disk is damaged and is replaced, initially there is no data on it. The round-robin approach would mean that the newly replaced disk will always have fewer partitions that other disks. The same issue can occur if a new dir is added in the configuration. An alternative approach is to assign a new partition to the dir with the fewest partitions, which would alleviate this issue.
          Hide
          Jay Kreps added a comment -

          Yeah, I think I agree. It's a little more complex, be we can do a "least loaded" thing.

          Show
          Jay Kreps added a comment - Yeah, I think I agree. It's a little more complex, be we can do a "least loaded" thing.
          Hide
          Neha Narkhede added a comment -

          +1 on maintaining one highwatermark file per log directory. This file can contain highwatermarks for partitions that live in that log directory. One way of doing this is by maintaining the partition->highwatermark mapping per log directory inside the ReplicaManager and then just dumping that to the respective log directory's .highwatermark file by the checkpointing thread.

          Show
          Neha Narkhede added a comment - +1 on maintaining one highwatermark file per log directory. This file can contain highwatermarks for partitions that live in that log directory. One way of doing this is by maintaining the partition->highwatermark mapping per log directory inside the ReplicaManager and then just dumping that to the respective log directory's .highwatermark file by the checkpointing thread.
          Hide
          Jay Kreps added a comment -

          Updated patch:
          1. Split HWM file per data directory
          2. Move to a "least partitions" partition assignment strategy
          3. Add a unit test for the assignment strategy

          I think I may have also fixed the transient failure in LogManager.testTimeBasedFlush, though it remains a time-bomb due to its reliance on the scheduler and wall-clock time.

          One thing to think about is that the use of "least loaded" does have a few corner cases of its own. In general it won't differ much from round robin. The case where it will differ is the case where we add a new data directory to an existing server or lose a single data directory on a server. In this case ALL new partitions will be created in the empty data directory until it becomes full. The problem this could create is that any new topics created during this time period will have all partitions assigned to the empty data dir. This may lead to imbalance of load. I think despite this, this strategy is better than (1) round robin, (2) RAID, or (3) something more complicated we might think of now.

          This patch is ready for review.

          Show
          Jay Kreps added a comment - Updated patch: 1. Split HWM file per data directory 2. Move to a "least partitions" partition assignment strategy 3. Add a unit test for the assignment strategy I think I may have also fixed the transient failure in LogManager.testTimeBasedFlush, though it remains a time-bomb due to its reliance on the scheduler and wall-clock time. One thing to think about is that the use of "least loaded" does have a few corner cases of its own. In general it won't differ much from round robin. The case where it will differ is the case where we add a new data directory to an existing server or lose a single data directory on a server. In this case ALL new partitions will be created in the empty data directory until it becomes full. The problem this could create is that any new topics created during this time period will have all partitions assigned to the empty data dir. This may lead to imbalance of load. I think despite this, this strategy is better than (1) round robin, (2) RAID, or (3) something more complicated we might think of now. This patch is ready for review.
          Hide
          Jay Kreps added a comment -

          Wups, missed two minor changes in that last patch.

          Show
          Jay Kreps added a comment - Wups, missed two minor changes in that last patch.
          Hide
          Jay Kreps added a comment -

          Attached an updated patch with a few very minor changes:
          1. If there is only a single log directory I skip the least loaded calculation. This calculation iterates over all logs so it could be a bit expensive in cases where we have very many logs (though it should be rare). This makes it so that the case where we have only one directory is no more expensive then it is now.
          2. Fixed a deprecation warning
          3. Fixed an outdated scaladoc comment

          Show
          Jay Kreps added a comment - Attached an updated patch with a few very minor changes: 1. If there is only a single log directory I skip the least loaded calculation. This calculation iterates over all logs so it could be a bit expensive in cases where we have very many logs (though it should be rare). This makes it so that the case where we have only one directory is no more expensive then it is now. 2. Fixed a deprecation warning 3. Fixed an outdated scaladoc comment
          Hide
          Neha Narkhede added a comment -

          Took a quick look at patch v4. Here are few review comments -

          1. KafkaConfig

          We should probably raise an error if the same log directory name was specified more than once.

          2. LogManager

          2.1. I see you have a check in createLogIfNotExists to return if the log is already created. I guess this will happen if two threads execute the following at the same time and enter createLogIfNotExists one after the other.
          logs.get(topicAndPartition) match {
          case null => createLogIfNotExists(topicAndPartition)

          I wonder if it is useful to move the lock to getOrCreateLog instead ? Also, shouldn't we use the same lock to protect other accesses to the "logs" data structure (getLog(), allLogs() and topics()) ?

          2.2. Fix typo on nextLogDir "chose the" -> "choose the"

          3. ReplicaManager

          3.1 Does it make sense to handle the absence of a matching Log for a topic partition correctly, instead of assuming the presence of one through the get API on an Option ?

          3.2 Nit pick -> "highwater marks" -> "high watermarks" or "highwatermarks" ?

          4. HighwatermarkCheckpoint

          4.1 While you're in there, do you mind changing the following API to take in a map of TopicAndPartition->Long instead ? We've been bitten by scala bugs that don't handle equality on tuples very well.
          def write(highwaterMarksPerPartition: Map[(String, Int), Long])

          Show
          Neha Narkhede added a comment - Took a quick look at patch v4. Here are few review comments - 1. KafkaConfig We should probably raise an error if the same log directory name was specified more than once. 2. LogManager 2.1. I see you have a check in createLogIfNotExists to return if the log is already created. I guess this will happen if two threads execute the following at the same time and enter createLogIfNotExists one after the other. logs.get(topicAndPartition) match { case null => createLogIfNotExists(topicAndPartition) I wonder if it is useful to move the lock to getOrCreateLog instead ? Also, shouldn't we use the same lock to protect other accesses to the "logs" data structure (getLog(), allLogs() and topics()) ? 2.2. Fix typo on nextLogDir "chose the" -> "choose the" 3. ReplicaManager 3.1 Does it make sense to handle the absence of a matching Log for a topic partition correctly, instead of assuming the presence of one through the get API on an Option ? 3.2 Nit pick -> "highwater marks" -> "high watermarks" or "highwatermarks" ? 4. HighwatermarkCheckpoint 4.1 While you're in there, do you mind changing the following API to take in a map of TopicAndPartition->Long instead ? We've been bitten by scala bugs that don't handle equality on tuples very well. def write(highwaterMarksPerPartition: Map [(String, Int), Long] )
          Hide
          Jay Kreps added a comment -

          New patch, rebased and addresses Neha's comments:

          1. Good thought. Added a check in LogManager to detect duplicate data directories. This is not the only bad possibility though. It is possible to have another kafka process that has opened using the same data directory. I am not sure what would happen, but something bad. I added a per data-directory file lock to check for this. This adds a new file .lock to each data directory and uses it to do the equivalent of flock/funlock. This will lock access across processes or within a process.

          2.1. I agree this is a bit roundabout. The reason is that the logs are in a Pool, which is really a ConcurrentHashMap. These are nice as they don't lock the whole hash table on each lookup. So I think although it is a little more verbose it is better how it is because in the common case (fetching a log) there is no global lock needed. This should also make the other accesses threadsafe.

          2.2. Learned to spell Choose.

          3.1. I don't really understand this code that well, so I am not sure. If it is a programming error to for there not to be a log present then I would rather leave it (I think you would get the NoSuchElementException and it would be clear what happened). The reason is that adding a match/case statement in the middle of that groupby is going to make it awfully hard to understand.

          3.2. Fixed, nice catch.

          4.1. Done.

          Also:
          1. Re-arranged methods in LogManager to make a little more sense.

          Show
          Jay Kreps added a comment - New patch, rebased and addresses Neha's comments: 1. Good thought. Added a check in LogManager to detect duplicate data directories. This is not the only bad possibility though. It is possible to have another kafka process that has opened using the same data directory. I am not sure what would happen, but something bad. I added a per data-directory file lock to check for this. This adds a new file .lock to each data directory and uses it to do the equivalent of flock/funlock. This will lock access across processes or within a process. 2.1. I agree this is a bit roundabout. The reason is that the logs are in a Pool, which is really a ConcurrentHashMap. These are nice as they don't lock the whole hash table on each lookup. So I think although it is a little more verbose it is better how it is because in the common case (fetching a log) there is no global lock needed. This should also make the other accesses threadsafe. 2.2. Learned to spell Choose. 3.1. I don't really understand this code that well, so I am not sure. If it is a programming error to for there not to be a log present then I would rather leave it (I think you would get the NoSuchElementException and it would be clear what happened). The reason is that adding a match/case statement in the middle of that groupby is going to make it awfully hard to understand. 3.2. Fixed, nice catch. 4.1. Done. Also: 1. Re-arranged methods in LogManager to make a little more sense.
          Hide
          Jun Rao added a comment -

          Thanks for patch v5. Some more comments:

          50. LogManager.nextLogDir(): zeros should only include dirs not already used, right? Currently, it seems to include all log dirs.

          51. ReplicaManager.checkpointHighWatermarks(): When handling a leaderAndIsr request, we first create a partition and then create a local replica (which creates the local log). So, there is a slight possibility that a partition in allPartitions may not have a local log. The simplest way is to ignore such partition when checkpointing HW.

          52. VerifiableProperties: The following constructor doesn't seem to be used.
          def this() = this(new Properties)

          Show
          Jun Rao added a comment - Thanks for patch v5. Some more comments: 50. LogManager.nextLogDir(): zeros should only include dirs not already used, right? Currently, it seems to include all log dirs. 51. ReplicaManager.checkpointHighWatermarks(): When handling a leaderAndIsr request, we first create a partition and then create a local replica (which creates the local log). So, there is a slight possibility that a partition in allPartitions may not have a local log. The simplest way is to ignore such partition when checkpointing HW. 52. VerifiableProperties: The following constructor doesn't seem to be used. def this() = this(new Properties)
          Hide
          Jay Kreps added a comment -

          Okay, you and Neha are giving conflicting advice on ReplicaManager. Can I omit the case where there is no replica? That is what is complicating this method, because if there is no replica then will there be a log to get the parent directory from?

          If so then I am left with:

          def checkpointHighWatermarks() {
          val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect

          {case Some(replica) => replica}

          val replicasByDir = replicas.filter(.log.isDefined).groupBy(.log.get.dir.getParent)
          for((dir, reps) <- replicasByDir)

          { val hwms = reps.map(r => (TopicAndPartition(r.topic, r.partitionId) -> r.highWatermark)).toMap highWatermarkCheckpoints(dir).write(hwms) }

          }

          which is much simpler. lmk.

          Show
          Jay Kreps added a comment - Okay, you and Neha are giving conflicting advice on ReplicaManager. Can I omit the case where there is no replica? That is what is complicating this method, because if there is no replica then will there be a log to get the parent directory from? If so then I am left with: def checkpointHighWatermarks() { val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect {case Some(replica) => replica} val replicasByDir = replicas.filter( .log.isDefined).groupBy( .log.get.dir.getParent) for((dir, reps) <- replicasByDir) { val hwms = reps.map(r => (TopicAndPartition(r.topic, r.partitionId) -> r.highWatermark)).toMap highWatermarkCheckpoints(dir).write(hwms) } } which is much simpler. lmk.
          Hide
          Jay Kreps added a comment -

          Okay this patch addresses Jun's comments:

          50. The zeros are actually correct, basically I am initializing to 0 and ten overwriting with the count if there is one. The goal is to ensure that there is an entry for each directory even if it has no logs (otherwise it would never get any logs assigned). It is possible to do this with some kind of case statement, but I think this is more readable.

          51. Okay I used the logic above. The logic is now slightly different from what was there before. Now I filter any partition which has no replica from the file. I also filter any replica which has no log, though my understanding is that that shouldn't happen.

          52. Left this. The idea is that previously for tests you could do new Properties so it makes sense to be able to do new VerifiableProperties. Not essential so happy either way.

          Show
          Jay Kreps added a comment - Okay this patch addresses Jun's comments: 50. The zeros are actually correct, basically I am initializing to 0 and ten overwriting with the count if there is one. The goal is to ensure that there is an entry for each directory even if it has no logs (otherwise it would never get any logs assigned). It is possible to do this with some kind of case statement, but I think this is more readable. 51. Okay I used the logic above. The logic is now slightly different from what was there before. Now I filter any partition which has no replica from the file. I also filter any replica which has no log, though my understanding is that that shouldn't happen. 52. Left this. The idea is that previously for tests you could do new Properties so it makes sense to be able to do new VerifiableProperties. Not essential so happy either way.
          Hide
          Jun Rao added a comment -

          Our system tests fail with the latest patch.

          python -B system_test_runner.py 2>&1 | tee test.out

          Saw the following in broker log.

          [2012-10-30 07:40:19,682] FATAL Fatal error during KafkaServerStable startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
          java.io.IOException: No such file or directory
          at java.io.UnixFileSystem.createFileExclusively(Native Method)
          at java.io.File.createNewFile(File.java:883)
          at kafka.utils.FileLock.<init>(FileLock.scala:12)
          at kafka.log.LogManager$$anonfun$10.apply(LogManager.scala:64)
          at kafka.log.LogManager$$anonfun$10.apply(LogManager.scala:64)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
          at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
          at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
          at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
          at kafka.log.LogManager.<init>(LogManager.scala:64)
          at kafka.server.KafkaServer.startup(KafkaServer.scala:60)
          at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
          at kafka.Kafka$.main(Kafka.scala:46)
          at kafka.Kafka.main(Kafka.scala)
          [2012-10-30 07:40:19,683] INFO [Kafka Server 1], shutting down (kafka.server.KafkaServer)

          Show
          Jun Rao added a comment - Our system tests fail with the latest patch. python -B system_test_runner.py 2>&1 | tee test.out Saw the following in broker log. [2012-10-30 07:40:19,682] FATAL Fatal error during KafkaServerStable startup. Prepare to shutdown (kafka.server.KafkaServerStartable) java.io.IOException: No such file or directory at java.io.UnixFileSystem.createFileExclusively(Native Method) at java.io.File.createNewFile(File.java:883) at kafka.utils.FileLock.<init>(FileLock.scala:12) at kafka.log.LogManager$$anonfun$10.apply(LogManager.scala:64) at kafka.log.LogManager$$anonfun$10.apply(LogManager.scala:64) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) at kafka.log.LogManager.<init>(LogManager.scala:64) at kafka.server.KafkaServer.startup(KafkaServer.scala:60) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:46) at kafka.Kafka.main(Kafka.scala) [2012-10-30 07:40:19,683] INFO [Kafka Server 1] , shutting down (kafka.server.KafkaServer)
          Hide
          Jay Kreps added a comment -

          Looks like previously if you configured a data directory that didn't exist we created it for you. This behavior was broken since now we try to acquire a lock first.

          This patch addresses that problem with the following changes:
          0. Rebased
          1. Cleaned up the LogManager initialization so that we create and validate directories before locking them. This fixes the issue.
          2. It looks like the nio FileLock.tryLock has different behvaiors depending on whether the lock is held by your process or another process. This could lead to getting a bad error message if we failed to lock the data directory. Fixed this in our wrapper class.

          Show
          Jay Kreps added a comment - Looks like previously if you configured a data directory that didn't exist we created it for you. This behavior was broken since now we try to acquire a lock first. This patch addresses that problem with the following changes: 0. Rebased 1. Cleaned up the LogManager initialization so that we create and validate directories before locking them. This fixes the issue. 2. It looks like the nio FileLock.tryLock has different behvaiors depending on whether the lock is held by your process or another process. This could lead to getting a bad error message if we failed to lock the data directory. Fixed this in our wrapper class.
          Hide
          Jun Rao added a comment -

          Now, some unit tests fail with exceptions like the following. Most of them seem to be transient, but they show up more frequently now.

          [error] Test Failed: testCleanShutdown(kafka.server.ServerShutdownTest)
          kafka.common.KafkaException: Failed to acquire lock on file .lock in /tmp/kafka-246675. A Kafka instance in another process or thread is using this directory.

          Show
          Jun Rao added a comment - Now, some unit tests fail with exceptions like the following. Most of them seem to be transient, but they show up more frequently now. [error [0m] Test Failed: testCleanShutdown(kafka.server.ServerShutdownTest) kafka.common.KafkaException: Failed to acquire lock on file .lock in /tmp/kafka-246675. A Kafka instance in another process or thread is using this directory.
          Hide
          Jay Kreps added a comment -

          This failing test was due to a real bug in LogManager.shutdown that lead to the locks not being released. I fixed this bug. In addition:
          1. Rebased again
          2. Cleaned up ServerShutdownTest and added comments since I spent a lot of time puzzling over this test to figure out what it was doing.
          3. Add Utils.swallow to each shutdown call in KafkaServer.shutdown--otherwise any failure prevents the rest of the shutdown.

          Show
          Jay Kreps added a comment - This failing test was due to a real bug in LogManager.shutdown that lead to the locks not being released. I fixed this bug. In addition: 1. Rebased again 2. Cleaned up ServerShutdownTest and added comments since I spent a lot of time puzzling over this test to figure out what it was doing. 3. Add Utils.swallow to each shutdown call in KafkaServer.shutdown--otherwise any failure prevents the rest of the shutdown.
          Hide
          Jun Rao added a comment -

          +1 on patch v8. Thanks,

          Show
          Jun Rao added a comment - +1 on patch v8. Thanks,
          Hide
          Jay Kreps added a comment -

          Checked in rebased version of v8. I did see one failure in the system tests, but chased it down and it was due to KAFKA-593.

          Show
          Jay Kreps added a comment - Checked in rebased version of v8. I did see one failure in the system tests, but chased it down and it was due to KAFKA-593 .

            People

            • Assignee:
              Jay Kreps
              Reporter:
              Jay Kreps
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development