Cassandra
  1. Cassandra
  2. CASSANDRA-4310

Multiple independent Level Compactions in Parallel

    Details

      Description

      Problem: If you are inserting data into cassandra and level compaction cannot catchup, you will create lot of files in L0.

      Here is a solution which will help here and also increase the performance of level compaction.

      We can do many compactions in parallel for unrelated data.
      1) For no over lapping levels. Ex: when L0 stable is compacting with L1, we can do compactions in other levels like L2 and L3 if they are eligible.
      2) We can also do compactions with files in L1 which are not participating in L0 compactions.

      This is specially useful if you are using SSD and is not bottlenecked by IO.

      I am seeing this issue in my cluster. The compactions pending are more than 50k and the disk usage is not that much(I am using SSD).
      I am doing multithreaded to true and also not throttling the IO by putting the value as 0.

      1. 4310-v6.txt
        55 kB
        Yuki Morishita
      2. 4310-v5.txt
        54 kB
        Jonathan Ellis
      3. 4310-v3.txt
        33 kB
        Yuki Morishita
      4. 4310-v2.txt
        27 kB
        Yuki Morishita
      5. 4310.txt
        25 kB
        Yuki Morishita

        Activity

        Hide
        sankalp kohli added a comment -

        One of the ways of speeding up level compaction mostly when using SSDs is to do them in parallel. Here is the algorithm.

        public List<Set<sstableReader>> getIndependantCompactions() {
        Create a set of stables called compacting.
        Create a List of Set of stables like List<Set<sstableReader>> compactionList.
        LIMIT_OF_COMPACTIONS = the number of compactions to do in parallel;

        //Ignore L0 for now.
        for(int level = MAX(8) to 1) {
        while(level is eligible and compactionList.size < LIMIT_OF_COMPACTIONS and we have not checked for all stables in this level)

        { select the next stable from level L and also its overlapping stables in L+1 level like we do now. if stables found in previous step do not overlap with any compacting stable(use compacting set to store the stables which are compacting) in this set, add then to compactionList. persist next for next call to this method like we do now so that when this level become eligible again, it continues from here. }

        if(compactionList.size >= LIMIT_OF_COMPACTIONS)
        return compactionList;

        //DO it for L0
        //Level 0 is a little different as stables don't don't have mutual exclusion of keys.
        //So we have to check whether two overlapping stables are not schedules as two different compactions in parallel.
        Create a set of stables called L0selected - This will store the stables used for compactions. We will also use compacting.
        while(L0 is eligible and compactionList.size < LIMIT_OF_COMPACTIONS and we have not checked for all stables in this level)

        { Find the oldest stable(like normal) which is not being compacted(by checking in compacting set) and does not overlap with any stable in L0selected. Find up to 31 overlapping stables with the stable found in previous step in L0. if none of these 32(31 + 1) stables overlap with any stable in L0Selected set or are in it then do next step Find the overlapping stables in L1 for each 32 stables. If none of L1 stable is in compacting set then add them to compactionList. Also add them all to L0selected. }

        return compactionList;
        }

        Now compact each set in the list in parallel.

        Show
        sankalp kohli added a comment - One of the ways of speeding up level compaction mostly when using SSDs is to do them in parallel. Here is the algorithm. public List<Set<sstableReader>> getIndependantCompactions() { Create a set of stables called compacting. Create a List of Set of stables like List<Set<sstableReader>> compactionList. LIMIT_OF_COMPACTIONS = the number of compactions to do in parallel; //Ignore L0 for now. for(int level = MAX(8) to 1) { while(level is eligible and compactionList.size < LIMIT_OF_COMPACTIONS and we have not checked for all stables in this level) { select the next stable from level L and also its overlapping stables in L+1 level like we do now. if stables found in previous step do not overlap with any compacting stable(use compacting set to store the stables which are compacting) in this set, add then to compactionList. persist next for next call to this method like we do now so that when this level become eligible again, it continues from here. } if(compactionList.size >= LIMIT_OF_COMPACTIONS) return compactionList; //DO it for L0 //Level 0 is a little different as stables don't don't have mutual exclusion of keys. //So we have to check whether two overlapping stables are not schedules as two different compactions in parallel. Create a set of stables called L0selected - This will store the stables used for compactions. We will also use compacting. while(L0 is eligible and compactionList.size < LIMIT_OF_COMPACTIONS and we have not checked for all stables in this level) { Find the oldest stable(like normal) which is not being compacted(by checking in compacting set) and does not overlap with any stable in L0selected. Find up to 31 overlapping stables with the stable found in previous step in L0. if none of these 32(31 + 1) stables overlap with any stable in L0Selected set or are in it then do next step Find the overlapping stables in L1 for each 32 stables. If none of L1 stable is in compacting set then add them to compactionList. Also add them all to L0selected. } return compactionList; } Now compact each set in the list in parallel.
        Hide
        Jonathan Ellis added a comment -

        The problem is that for common workloads we expect most L0 sstables to overlap with all L1 sstables. So there's very limited parallelism you can introduce in the L0 -> L1 stage, which is the biggest bottleneck.

        Show
        Jonathan Ellis added a comment - The problem is that for common workloads we expect most L0 sstables to overlap with all L1 sstables. So there's very limited parallelism you can introduce in the L0 -> L1 stage, which is the biggest bottleneck.
        Hide
        sankalp kohli added a comment -

        What you are saying is true. But the improvement I am saying has more than this. It also does compactions in parallel between different levels and also multiple compactions per level. So it will definitely speed things up. It is quite frustrating to see Disk not being fully used when you are using SSD.

        Also like you said L0->L1 is the biggest bottleneck. This will help it in a way. So when L0(32 stable) gets merged with L1, then L1 will merge with L2 and so on. But with this, you will be doing L0-L1 compactions almost every cycle unless L1-L2 is happening. So when say L3 > L4, L0>L1 compaction won't happen when it can.

        So this solution cannot help parallelize L0->L1, but it will help since it runs L0->L1 almost every time. It does not get blocked by compactions in higher levels.

        Show
        sankalp kohli added a comment - What you are saying is true. But the improvement I am saying has more than this. It also does compactions in parallel between different levels and also multiple compactions per level. So it will definitely speed things up. It is quite frustrating to see Disk not being fully used when you are using SSD. Also like you said L0->L1 is the biggest bottleneck. This will help it in a way. So when L0(32 stable) gets merged with L1, then L1 will merge with L2 and so on. But with this, you will be doing L0-L1 compactions almost every cycle unless L1-L2 is happening. So when say L3 > L4, L0 >L1 compaction won't happen when it can. So this solution cannot help parallelize L0->L1, but it will help since it runs L0->L1 almost every time. It does not get blocked by compactions in higher levels.
        Hide
        Jonathan Ellis added a comment -

        That would be a good improvement to make.

        Show
        Jonathan Ellis added a comment - That would be a good improvement to make.
        Hide
        Yuki Morishita added a comment -

        Initial patch attached to get feedback(still writing unit test for this).

        I introduced max_concurrent_tasks as new compaction option of LeveledCompactionStrategy(default is 1) to specify number of tasks to run in parallel.

        List of sstables set is chosen by the algorithm similar to above in LeveledManifest#getCompactionCandidates.
        If there are 2 or more sets to compact, then ParallelLeveledCompactionTask gets created with its own executor, and performs compaction in parallel.

        I feel I need some unit test for selecting list of sstables, so patch is still in progress. will update if it's done.

        Show
        Yuki Morishita added a comment - Initial patch attached to get feedback(still writing unit test for this). I introduced max_concurrent_tasks as new compaction option of LeveledCompactionStrategy(default is 1) to specify number of tasks to run in parallel. List of sstables set is chosen by the algorithm similar to above in LeveledManifest#getCompactionCandidates. If there are 2 or more sets to compact, then ParallelLeveledCompactionTask gets created with its own executor, and performs compaction in parallel. I feel I need some unit test for selecting list of sstables, so patch is still in progress. will update if it's done.
        Hide
        Jonathan Ellis added a comment -

        If there are 2 or more sets to compact, then ParallelLeveledCompactionTask gets created with its own executor, and performs compaction in parallel

        This looks unnecessary, we already have CompactionExecutor with the correct (concurrent_compactor) number of threads created.

        Unfortunately I think I took a step backwards in CASSANDRA-2407, which is where we changed the API from List<CompactionTask> getBackgroundTasks to CompactionTask getNextBackgroundTask.

        The latter made things more serial deliberately, since 2407 was trying to make STCS finish off small buckets, before working on larger ones. But this now looks like optimizing for the wrong thing.

        That said, I'm not sure we need to switch back to the List api, since either way we need to make the candidate generation aware of what is already being compacted (since submitBackground can get called for multiple flushes before it's done with the first set). So what I would propose is, make CM.submitBackground loop until

        • there are no more idle executor threads, or
        • gNBT returns null
        Show
        Jonathan Ellis added a comment - If there are 2 or more sets to compact, then ParallelLeveledCompactionTask gets created with its own executor, and performs compaction in parallel This looks unnecessary, we already have CompactionExecutor with the correct ( concurrent_compactor ) number of threads created. Unfortunately I think I took a step backwards in CASSANDRA-2407 , which is where we changed the API from List<CompactionTask> getBackgroundTasks to CompactionTask getNextBackgroundTask . The latter made things more serial deliberately, since 2407 was trying to make STCS finish off small buckets, before working on larger ones. But this now looks like optimizing for the wrong thing. That said, I'm not sure we need to switch back to the List api, since either way we need to make the candidate generation aware of what is already being compacted (since submitBackground can get called for multiple flushes before it's done with the first set). So what I would propose is, make CM.submitBackground loop until there are no more idle executor threads, or gNBT returns null
        Hide
        Jonathan Ellis added a comment -

        there are no more idle executor threads

        Worth elaborating that we have slightly conflicting goals here:

        1. we want to keep all concurrent_compactor threads busy
        2. but, we don't want to enqueue tasks farther in advance than we need to, since more information can become available due to other flushes or compactions finishing in the meantime, that allows us to create a more optimal task

        So I think what we want is to track "currently being compacted CFs" in CompactionManager. If a CF is currently being compacted, and there are no idle threads, submitBackground can be a no-op; we can wait for the current compaction to finish and re-submit when more information is available. Otherwise, we should submit at least one task to prevent starvation by busier CFs, and more if there are idle threads still.

        Show
        Jonathan Ellis added a comment - there are no more idle executor threads Worth elaborating that we have slightly conflicting goals here: we want to keep all concurrent_compactor threads busy but, we don't want to enqueue tasks farther in advance than we need to, since more information can become available due to other flushes or compactions finishing in the meantime, that allows us to create a more optimal task So I think what we want is to track "currently being compacted CFs" in CompactionManager. If a CF is currently being compacted, and there are no idle threads, submitBackground can be a no-op; we can wait for the current compaction to finish and re-submit when more information is available. Otherwise, we should submit at least one task to prevent starvation by busier CFs, and more if there are idle threads still.
        Hide
        Yuki Morishita added a comment -

        v2 attached.

        In this version, LeveledCompactionTask can run in parallel using CompactionExecutor.
        CM#submitBackground submits task when submitted CF is not currently compacting, and it fills up threads with compaction task when there is room.

        Show
        Yuki Morishita added a comment - v2 attached. In this version, LeveledCompactionTask can run in parallel using CompactionExecutor. CM#submitBackground submits task when submitted CF is not currently compacting, and it fills up threads with compaction task when there is room.
        Hide
        Jonathan Ellis added a comment -

        Thanks, this did end up a lot simpler IMO. Comments:

        • Shouldn't "skip submitBackground" logic be "CF is currently compacting, AND no more idle threads?"
        • Looks like compactingCF.remove(cfs); could be moved to the topmost finally block of run()
        • We already track compacting sstables in DataTracker.View; seems like we shouldn't need to duplicate this in LeveledManifest. (I note that getting this correct in DT took some effort so I am worried on that level as well as abstract code purity.) If the problem is that getNextBackgroundTask is called multiple times before CompactionManager officially marks the sstables as in-progress, my preferred solution would be to move the in-progress code earlier.
        Show
        Jonathan Ellis added a comment - Thanks, this did end up a lot simpler IMO. Comments: Shouldn't "skip submitBackground" logic be "CF is currently compacting, AND no more idle threads?" Looks like compactingCF.remove(cfs); could be moved to the topmost finally block of run() We already track compacting sstables in DataTracker.View; seems like we shouldn't need to duplicate this in LeveledManifest. (I note that getting this correct in DT took some effort so I am worried on that level as well as abstract code purity.) If the problem is that getNextBackgroundTask is called multiple times before CompactionManager officially marks the sstables as in-progress, my preferred solution would be to move the in-progress code earlier.
        Hide
        Yuki Morishita added a comment -

        V3 attached.

        Shouldn't "skip submitBackground" logic be "CF is currently compacting, AND no more idle threads?"

        Yes. Fixed this.

        Looks like compactingCF.remove(cfs); could be moved to the topmost finally block of run()

        Done.

        We already track compacting sstables in DataTracker.View; seems like we shouldn't need to duplicate this in LeveledManifest. (I note that getting this correct in DT took some effort so I am worried on that level as well as abstract code purity.) If the problem is that getNextBackgroundTask is called multiple times before CompactionManager officially marks the sstables as in-progress, my preferred solution would be to move the in-progress code earlier.

        I modified the code to mark sstables compacting before returning compaction task so that I can use DataTracker.View's compacting property. If nothing can be marked, then no task is returned.

        I also cleaned up no longer used codes.

        Show
        Yuki Morishita added a comment - V3 attached. Shouldn't "skip submitBackground" logic be "CF is currently compacting, AND no more idle threads?" Yes. Fixed this. Looks like compactingCF.remove(cfs); could be moved to the topmost finally block of run() Done. We already track compacting sstables in DataTracker.View; seems like we shouldn't need to duplicate this in LeveledManifest. (I note that getting this correct in DT took some effort so I am worried on that level as well as abstract code purity.) If the problem is that getNextBackgroundTask is called multiple times before CompactionManager officially marks the sstables as in-progress, my preferred solution would be to move the in-progress code earlier. I modified the code to mark sstables compacting before returning compaction task so that I can use DataTracker.View's compacting property. If nothing can be marked, then no task is returned. I also cleaned up no longer used codes.
        Hide
        Jonathan Ellis added a comment - - edited

        Looking good.

        Pushed v4 to https://github.com/jbellis/cassandra/branches/4310-4 with some changes:

        • made getNextBackgroundTask synchronized. lets us not have to worry about subsets in markCompacting (which we were handling incorrectly for user-defined compactions – want to fail if we can't do exactly what user asks for).
        • made L0 do a best-effort job when L1 compactions get in the way of what it would prefer to do (this includes the CASSANDRA-4778 fix)
        • skipped the initial overlapping-with-compaction difference in > L0 case since it gets included in the check after union with overlapping in L+1
        • added some @VisibleForTesting annotations

        Also did some more cleanup of forceDeserialize and isCompactionInteresting in a separate commit.

        Show
        Jonathan Ellis added a comment - - edited Looking good. Pushed v4 to https://github.com/jbellis/cassandra/branches/4310-4 with some changes: made getNextBackgroundTask synchronized. lets us not have to worry about subsets in markCompacting (which we were handling incorrectly for user-defined compactions – want to fail if we can't do exactly what user asks for). made L0 do a best-effort job when L1 compactions get in the way of what it would prefer to do (this includes the CASSANDRA-4778 fix) skipped the initial overlapping-with-compaction difference in > L0 case since it gets included in the check after union with overlapping in L+1 added some @VisibleForTesting annotations Also did some more cleanup of forceDeserialize and isCompactionInteresting in a separate commit.
        Hide
        Yuki Morishita added a comment -

        Jonathan,

        Thanks for review.
        But LeveledCompactionStrategyTest is failing due to AssertionError.
        Commented on github: https://github.com/jbellis/cassandra/commit/a2e6e1ac90f99755400c5d3c67a04efd50e1f15d

        Cleanup looks good.

        Show
        Yuki Morishita added a comment - Jonathan, Thanks for review. But LeveledCompactionStrategyTest is failing due to AssertionError. Commented on github: https://github.com/jbellis/cassandra/commit/a2e6e1ac90f99755400c5d3c67a04efd50e1f15d Cleanup looks good.
        Hide
        Jonathan Ellis added a comment -

        squashed v5 attached with fixes.

        Show
        Jonathan Ellis added a comment - squashed v5 attached with fixes.
        Hide
        Yuki Morishita added a comment -

        v5 simplifies a lot.

        There is one thing we need to fix.
        The test I added to v2 (LCSTest#testParallelLeveledCompaction) should generate more SSTables first in order to run compactions in parallel(right now it's 20, but that is not enough to fill compaction threads).

        When I run that test alone with 128 SSTabbles(so there are 128 sstables on L0 only), leveled compaction produces following error.

        12/10/10 10:23:12 ERROR compaction.LeveledManifest: At level 1, SSTableReader(path='build/test/cassandra/data/Keyspace1/StandardLeveled/Keyspace1-StandardLeveled-ia-186-Data.db') [DecoratedKey(Token(bytes[3735]), 3735), DecoratedKey(Token(bytes[3738]), 3738)] overlaps SSTableReader(path='build/test/cassandra/data/Keyspace1/StandardLeveled/Keyspace1-StandardLeveled-ia-179-Data.db') [DecoratedKey(Token(bytes[3737]), 3737), DecoratedKey(Token(bytes[3830]), 3830)].  This is caused by a bug in Cassandra 1.1.0 .. 1.1.3.  Sending back to L0.  If you have not yet run scrub, you should do so since you may also have rows out-of-order within an sstable
        

        So, we have to check candidates against compacting L0 sstables for possible generation of overlapping sstables in L1.
        Pushed fix to github: https://github.com/yukim/cassandra/commits/4310-6
        Diff from v5: https://github.com/yukim/cassandra/commit/a21fc62ac545ff6f07bcebf1ad4732ee9c505657

        Show
        Yuki Morishita added a comment - v5 simplifies a lot. There is one thing we need to fix. The test I added to v2 (LCSTest#testParallelLeveledCompaction) should generate more SSTables first in order to run compactions in parallel(right now it's 20, but that is not enough to fill compaction threads). When I run that test alone with 128 SSTabbles(so there are 128 sstables on L0 only), leveled compaction produces following error. 12/10/10 10:23:12 ERROR compaction.LeveledManifest: At level 1, SSTableReader(path='build/test/cassandra/data/Keyspace1/StandardLeveled/Keyspace1-StandardLeveled-ia-186-Data.db') [DecoratedKey(Token(bytes[3735]), 3735), DecoratedKey(Token(bytes[3738]), 3738)] overlaps SSTableReader(path='build/test/cassandra/data/Keyspace1/StandardLeveled/Keyspace1-StandardLeveled-ia-179-Data.db') [DecoratedKey(Token(bytes[3737]), 3737), DecoratedKey(Token(bytes[3830]), 3830)]. This is caused by a bug in Cassandra 1.1.0 .. 1.1.3. Sending back to L0. If you have not yet run scrub, you should do so since you may also have rows out-of-order within an sstable So, we have to check candidates against compacting L0 sstables for possible generation of overlapping sstables in L1. Pushed fix to github: https://github.com/yukim/cassandra/commits/4310-6 Diff from v5: https://github.com/yukim/cassandra/commit/a21fc62ac545ff6f07bcebf1ad4732ee9c505657
        Hide
        Jonathan Ellis added a comment -

        Good catch. Comments:

        • Is filtering out the L1 compacting correct? If they are promoted to L2 then great, but if they are part of what is happening in L0 then you still get overlaps
        • You can use Predicates.in instead of an anonymous Predicate
        Show
        Jonathan Ellis added a comment - Good catch. Comments: Is filtering out the L1 compacting correct? If they are promoted to L2 then great, but if they are part of what is happening in L0 then you still get overlaps You can use Predicates.in instead of an anonymous Predicate
        Hide
        Yuki Morishita added a comment -

        Is filtering out the L1 compacting correct? If they are promoted to L2 then great, but if they are part of what is happening in L0 then you still get overlaps

        You are right. Since we don't know which sstable goes which level, I reverted that part.

        You can use Predicates.in instead of an anonymous Predicate

        Fixed.

        Update here: https://github.com/yukim/cassandra/commit/af8df59aedda3c99bd1249dbb86572b89c333117

        Show
        Yuki Morishita added a comment - Is filtering out the L1 compacting correct? If they are promoted to L2 then great, but if they are part of what is happening in L0 then you still get overlaps You are right. Since we don't know which sstable goes which level, I reverted that part. You can use Predicates.in instead of an anonymous Predicate Fixed. Update here: https://github.com/yukim/cassandra/commit/af8df59aedda3c99bd1249dbb86572b89c333117
        Hide
        Jonathan Ellis added a comment -

        I think we still need the check for "are any of the L1 tables overlapped by my candidates busy compacting," which I was doing w/ the interesection check.

        Show
        Jonathan Ellis added a comment - I think we still need the check for "are any of the L1 tables overlapped by my candidates busy compacting," which I was doing w/ the interesection check.
        Hide
        Yuki Morishita added a comment -

        Looks like we need that too.
        Brought back intersection check and attached v6.

        Diff from last commit is here: https://github.com/yukim/cassandra/commit/22467049e7fb7f3996fe6dafa0213dd2f144bca6

        Show
        Yuki Morishita added a comment - Looks like we need that too. Brought back intersection check and attached v6. Diff from last commit is here: https://github.com/yukim/cassandra/commit/22467049e7fb7f3996fe6dafa0213dd2f144bca6
        Hide
        Jonathan Ellis added a comment -

        ship it!

        Show
        Jonathan Ellis added a comment - ship it!
        Hide
        Yuki Morishita added a comment -

        Committed.

        Show
        Yuki Morishita added a comment - Committed.

          People

          • Assignee:
            Yuki Morishita
            Reporter:
            sankalp kohli
            Reviewer:
            Jonathan Ellis
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development