Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Minor Minor
    • Resolution: Fixed
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      Hi,

      According to the dev's list discussion (1) I've patched the CompactionManager to allow parallel compaction.

      Mainly it splits the sstables to compact in the desired buckets, configured by a new parameter: compaction_parallelism with the current default of "1".
      Then, it just submits the units of work to a new executor and waits for the finalization.

      The patch was created in the trunk, so I don't know the exact affected version, I assume that is 0.8.
      I'll try to apply this patch to 0.6.X also for my current production installation, and then reattach it.

      (1) http://markmail.org/thread/cldnqfh3s3nufnke

      1. compactionPatch-V2.txt
        16 kB
        Germán Kondolf
      2. 1876-reformatted.txt
        15 kB
        Jonathan Ellis
      3. compactionPatch-V3.txt
        17 kB
        Germán Kondolf

        Issue Links

          Activity

          Hide
          Germán Kondolf added a comment -

          Patching of:

          • CompactionManager.java
          • Config.java
          • DatabaseDescriptor.java
          • cassandra.yaml
          Show
          Germán Kondolf added a comment - Patching of: CompactionManager.java Config.java DatabaseDescriptor.java cassandra.yaml
          Hide
          Jonathan Ellis added a comment -

          Thanks for the patch!

          Attaching patch w/ CompactionManager changes reformatted to illustrate http://wiki.apache.org/cassandra/CodeStyle.

          There are two main things I would like to fix here:

          • it would be cleaner to make the existing executor multithreaded, rather than adding a second with somewhat cumbersome coordination
          • this basically sub-bucketizes the buckets we've decided to compact, into smaller groups, which means we're going to have to re-compact the sub-buckets again very soon. this is a very high penalty to pay. (if that's what we want, then a user can do that w/in the existing framework by lowering MaximumCompactionThreshold, if we modify submitMinorIfNeeded to submit multiple buckets' worth. either way we should not re-group a second time outside of submitMinorIfNeeded.)

          ideally we would parallelize within a single sstable (breaking out the deserialize / merge / write stages) but this is Hard.

          Show
          Jonathan Ellis added a comment - Thanks for the patch! Attaching patch w/ CompactionManager changes reformatted to illustrate http://wiki.apache.org/cassandra/CodeStyle . There are two main things I would like to fix here: it would be cleaner to make the existing executor multithreaded, rather than adding a second with somewhat cumbersome coordination this basically sub-bucketizes the buckets we've decided to compact, into smaller groups, which means we're going to have to re-compact the sub-buckets again very soon. this is a very high penalty to pay. (if that's what we want , then a user can do that w/in the existing framework by lowering MaximumCompactionThreshold, if we modify submitMinorIfNeeded to submit multiple buckets' worth. either way we should not re-group a second time outside of submitMinorIfNeeded.) ideally we would parallelize within a single sstable (breaking out the deserialize / merge / write stages) but this is Hard.
          Hide
          Germán Kondolf added a comment -

          I didn't have in mind that those new SSTables would be re-compacted again so soon. That's if the sstable count still being high, if it gets below the minimum it won't be a real problem, in fact I think that with this patch we should raise the minimumCompactionThreshold.

          In the other hand, maybe the approach I've commented on the mail-thread would be worth to try.

          Link: http://markmail.org/message/d2uh4mu5qnzm456w

          Could we just compact the indexes and filters and leave alone the data while we're doing minor compactions?
          It changes the storage structure a bit, there won't be a direct relation between the 3 parts of the SSTable, but in memory you will reduce the amount of filters to check.

          LogicSSTable could have:

          • IndexFile
          • FilterFile
          • SSTableFile[]

          And in the IndexFile structure will have also the file corresponding to each row.

          The drawback of this, is that expired items won't be removed from the SSTables, but instead we won't index and filter them, making the memory model efficient while we're receiving new items.
          The major compaction will have to do the real "housekeeping", and this kind of compaction won't produce LogicSSTables.

          Show
          Germán Kondolf added a comment - I didn't have in mind that those new SSTables would be re-compacted again so soon. That's if the sstable count still being high, if it gets below the minimum it won't be a real problem, in fact I think that with this patch we should raise the minimumCompactionThreshold. In the other hand, maybe the approach I've commented on the mail-thread would be worth to try. Link: http://markmail.org/message/d2uh4mu5qnzm456w Could we just compact the indexes and filters and leave alone the data while we're doing minor compactions? It changes the storage structure a bit, there won't be a direct relation between the 3 parts of the SSTable, but in memory you will reduce the amount of filters to check. LogicSSTable could have: IndexFile FilterFile SSTableFile[] And in the IndexFile structure will have also the file corresponding to each row. The drawback of this, is that expired items won't be removed from the SSTables, but instead we won't index and filter them, making the memory model efficient while we're receiving new items. The major compaction will have to do the real "housekeeping", and this kind of compaction won't produce LogicSSTables.
          Hide
          Jonathan Ellis added a comment -

          I don't think that's going to be much of a win – checking the filter is the fast part; it's merging different row versions that is expensive.

          Show
          Jonathan Ellis added a comment - I don't think that's going to be much of a win – checking the filter is the fast part; it's merging different row versions that is expensive.
          Hide
          Carlos Alvarez added a comment -

          In the case depicted by Germán, we could have a couple of thousands of small sstables but global, 'umbrella', index. This 'umbrella' index would be compacted, so a lookup for any key would have to be performed one time for any group of sstables under an umbrella index.

          We think that this way we would have a compacted row (because there is only one pointer in the umbrella index: a record in this index would have a sstable identifier and a offset inside the sstable) but without the cost of writing billions of unchanged rows to a new file (the cost of reading the entire sstables still persists).

          Suposse there are 8 sstables to compact in one bucket. The process would look like:

          • Open the iterator to read all the columns in all the sstables
          • Reduce the columns as it is done in the current version
          • Just write the sstable id and offset of this column to a new index (global to all the eight sstables being 'semicompacted') and update the new, global, bloom filter.

          The new 'compacted' sstable is just one (umbrella) index, one bloom filter and eight sstables. Reading a column would imply:

          • Checking the bloom filter (assume a true positive)
          • Find the value in the index (there is a increased cost because the index pointers would include the sstable id)
          • Find the value in the corresponding sstable.
          Show
          Carlos Alvarez added a comment - In the case depicted by Germán, we could have a couple of thousands of small sstables but global, 'umbrella', index. This 'umbrella' index would be compacted, so a lookup for any key would have to be performed one time for any group of sstables under an umbrella index. We think that this way we would have a compacted row (because there is only one pointer in the umbrella index: a record in this index would have a sstable identifier and a offset inside the sstable) but without the cost of writing billions of unchanged rows to a new file (the cost of reading the entire sstables still persists). Suposse there are 8 sstables to compact in one bucket. The process would look like: Open the iterator to read all the columns in all the sstables Reduce the columns as it is done in the current version Just write the sstable id and offset of this column to a new index (global to all the eight sstables being 'semicompacted') and update the new, global, bloom filter. The new 'compacted' sstable is just one (umbrella) index, one bloom filter and eight sstables. Reading a column would imply: Checking the bloom filter (assume a true positive) Find the value in the index (there is a increased cost because the index pointers would include the sstable id) Find the value in the corresponding sstable.
          Hide
          Jonathan Ellis added a comment -

          If I'm understanding correctly, that means you'd still need to do 8 seeks post-compaction to read the entire row, except in the edge case of each update modifying every column.

          Show
          Jonathan Ellis added a comment - If I'm understanding correctly, that means you'd still need to do 8 seeks post-compaction to read the entire row, except in the edge case of each update modifying every column.
          Hide
          Germán Kondolf added a comment -

          In the minor logical compaction process we reduce the column versions and write only the latest version to the new index and filter, this will cause orphan blocks inside the immutable SSTables, that will be cleared in a major compaction.

          Show
          Germán Kondolf added a comment - In the minor logical compaction process we reduce the column versions and write only the latest version to the new index and filter, this will cause orphan blocks inside the immutable SSTables, that will be cleared in a major compaction.
          Hide
          Jonathan Ellis added a comment -

          The sstable-level index only has row keys, not column names, and the row-level index has a summary of column names, not a complete list. (If you were going to make it complete, you might as well include the values with it... and now you've done a normal, merging compaction.)

          Show
          Jonathan Ellis added a comment - The sstable-level index only has row keys, not column names, and the row-level index has a summary of column names, not a complete list. (If you were going to make it complete, you might as well include the values with it... and now you've done a normal, merging compaction.)
          Hide
          Germán Kondolf added a comment -

          Certainly we didn't see that, perhaps for not knowing deeply the code, the "logic" SSTable seemed simple and feasible.
          Thanks for your feedback, if we had pursued this path we could end up in the same place where we started, like you said.

          It still being noicy that you pass the same data several times a day from an SSTable to another one.

          Regards,
          Germán

          Show
          Germán Kondolf added a comment - Certainly we didn't see that, perhaps for not knowing deeply the code, the "logic" SSTable seemed simple and feasible. Thanks for your feedback, if we had pursued this path we could end up in the same place where we started, like you said. It still being noicy that you pass the same data several times a day from an SSTable to another one. Regards, Germán
          Hide
          Jonathan Ellis added a comment -

          I agree. We have some ideas on CASSANDRA-1608 for how to improve that.

          Show
          Jonathan Ellis added a comment - I agree. We have some ideas on CASSANDRA-1608 for how to improve that.
          Hide
          Germán Kondolf added a comment -

          Regarding the original issue, I've repatched the trunk version to sync the new functionallity of keep sstable cached keys after compaction.

          Show
          Germán Kondolf added a comment - Regarding the original issue, I've repatched the trunk version to sync the new functionallity of keep sstable cached keys after compaction.
          Hide
          Jonathan Ellis added a comment -

          A while ago I said:

          ideally we would parallelize within a single sstable (breaking out the deserialize / merge / write stages) but this is Hard.

          It's hard, but for a lot of users (anyone where a single CF holds the bulk of the data) this is the only kind of optimization that will make a difference.

          There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers.

          So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). One thread merging corresponding rows from each input sstable. One thread doing serialize + writing the output. This should give us between 2x and 3x speedup (depending how much doing the merge on another thread than write saves us).

          This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front.

          Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a
          threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. (If this is a concern, we already have a tunable to limit the number of sstables merged at a time in a single CF.)

          IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.

          Show
          Jonathan Ellis added a comment - A while ago I said: ideally we would parallelize within a single sstable (breaking out the deserialize / merge / write stages) but this is Hard. It's hard, but for a lot of users (anyone where a single CF holds the bulk of the data) this is the only kind of optimization that will make a difference. There are five stages: read, deserialize, merge, serialize, and write. We probably want to continue doing read+deserialize and serialize+write together, or you waste a lot copying to/from buffers. So, what I would suggest is: one thread per input sstable doing read + deserialize (a row at a time). One thread merging corresponding rows from each input sstable. One thread doing serialize + writing the output. This should give us between 2x and 3x speedup (depending how much doing the merge on another thread than write saves us). This will require roughly 2x the memory, to allow the reader threads to work ahead of the merge stage. (I.e. for each input sstable you will have up to one row in a queue waiting to be merged, and the reader thread working on the next.) Seems quite reasonable on that front. Multithreaded compaction should be either on or off. It doesn't make sense to try to do things halfway (by doing the reads with a threadpool whose size you can grow/shrink, for instance): we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. (If this is a concern, we already have a tunable to limit the number of sstables merged at a time in a single CF.) IMO it's acceptable to punt completely on rows that are larger than memory, and fall back to the old non-parallel code there. I don't see any sane way to parallelize large-row compactions.
          Hide
          Germán Kondolf added a comment - - edited

          I like the whole idea.

          we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads.

          I've tried a patch to set the I/O Priority too (in a 0.6.x patch), maybe we could add that configuration to the compaction process, to keep the low impact.
          It's bounded to Linux OS, but I think that's not a problem.

          Also, using that queue in between, but limited to a configurable value, the producer stage (MERGE) will wait to an available place in queue.
          We could tune up how much newly merged rows we want to buffer before we write them to disk, and indirectly, control the memory used in the process.

          If you want, let me update the trunk and prepare a draft-patch.
          What do you think?

          Show
          Germán Kondolf added a comment - - edited I like the whole idea. we still have compaction threads tuned to low priority, by default, so the impact on the rest of the system won't be very different. Nor do we expect to have so many input sstables that we lose a lot in context switching between reader threads. I've tried a patch to set the I/O Priority too (in a 0.6.x patch), maybe we could add that configuration to the compaction process, to keep the low impact. It's bounded to Linux OS, but I think that's not a problem. Also, using that queue in between, but limited to a configurable value, the producer stage (MERGE) will wait to an available place in queue. We could tune up how much newly merged rows we want to buffer before we write them to disk, and indirectly, control the memory used in the process. If you want, let me update the trunk and prepare a draft-patch. What do you think?
          Hide
          Jonathan Ellis added a comment -

          I've tried a patch to set the I/O Priority too

          It's worth a try, it probably belongs either on CASSANDRA-1882 or a new ticket. (Does it use JNA then?)

          We could tune up how much newly merged rows we want to buffer before we write them to disk, and indirectly, control the memory used in the process.

          I don't understand how this is an improvement over just letting BufferedRandomAccessFile control the buffering.

          If you want, let me update the trunk and prepare a draft-patch

          Absolutely.

          Show
          Jonathan Ellis added a comment - I've tried a patch to set the I/O Priority too It's worth a try, it probably belongs either on CASSANDRA-1882 or a new ticket. (Does it use JNA then?) We could tune up how much newly merged rows we want to buffer before we write them to disk, and indirectly, control the memory used in the process. I don't understand how this is an improvement over just letting BufferedRandomAccessFile control the buffering. If you want, let me update the trunk and prepare a draft-patch Absolutely.
          Hide
          Stu Hood added a comment -

          I think this one has been sufficiently resolved in trunk.

          Show
          Stu Hood added a comment - I think this one has been sufficiently resolved in trunk.
          Hide
          Jonathan Ellis added a comment -

          created CASSANDRA-2901 to follow up on concurrency-for-single-merge.

          Show
          Jonathan Ellis added a comment - created CASSANDRA-2901 to follow up on concurrency-for-single-merge.

            People

            • Assignee:
              Unassigned
              Reporter:
              Germán Kondolf
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development