Accumulo
  1. Accumulo
  2. ACCUMULO-2889

Batch metadata table updates for new walogs

    Details

    • Type: Improvement Improvement
    • Status: Patch Available
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 1.5.1, 1.6.0
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      Currently, when we update the Metadata table with new loggers, we will update the metadata for each tablet serially. We could optimize this to instead use a batchwriter to send all metadata updates for all tablets in a batch.

      A few special cases include:

      • What if the !METADATA tablet was included in the batch?
      • What about the root tablet?

      Benefit:
      In one of our clusters, we're experiencing particularly slow HDFS operations leading to large oscillations in ingest performance. We haven't isolated the cause in HDFS but when we profile the tservers, we noticed that they were waiting for metadata table operations to complete. This would target the waiting.

      Potential downsides:
      Given the existing locking scheme, it looks like we may have to lock a tablet for slightly longer (we'll lock for the duration of the batch).

      1. accumulo-2889_withoutpatch.png
        475 kB
        Jonathan Park
      2. ACCUMULO-2889.0.patch.txt
        30 kB
        Jonathan Park
      3. ACCUMULO-2889.1.patch
        35 kB
        Jonathan Park
      4. ACCUMULO-2889.2.patch
        33 kB
        Jonathan Park
      5. accumulo-2889-withpatch.png
        464 kB
        Jonathan Park
      6. batch_perf_test.sh
        0.5 kB
        Jonathan Park
      7. run_all.sh
        0.3 kB
        Jonathan Park
      8. start-ingest.sh
        2 kB
        Jonathan Park

        Issue Links

          Activity

          Hide
          Jonathan Park added a comment - - edited

          Our current proposal:

          TabletServer client threads:
          order(commitSessions) // this is to avoid deadlock across multiple client threads
          
          batch.start
          foreach tablet:
            tablet.logLock.lock
            if (tablet.mustRegisterNewLoggers)
              then
                defineTablet(tablet) // write WAL entry for tablet
                tablet.addLoggerToMetadataBatch(batch)
                // hold onto the lock
              else
                tablet.logLock.release
          
          batch.flush
          release(allCurrentlyHeldLocks);
          

          (edit: added some formatting to make this more readable)

          Show
          Jonathan Park added a comment - - edited Our current proposal: TabletServer client threads: order(commitSessions) // this is to avoid deadlock across multiple client threads batch.start foreach tablet: tablet.logLock.lock if (tablet.mustRegisterNewLoggers) then defineTablet(tablet) // write WAL entry for tablet tablet.addLoggerToMetadataBatch(batch) // hold onto the lock else tablet.logLock.release batch.flush release(allCurrentlyHeldLocks); (edit: added some formatting to make this more readable)
          Hide
          Jonathan Park added a comment -

          First pass at batching metadata updates for new WALs. I'll attach a screenshot of its affects as well.

          Show
          Jonathan Park added a comment - First pass at batching metadata updates for new WALs. I'll attach a screenshot of its affects as well.
          Hide
          Eric Newton added a comment -

          Jonathan Park, I'm very interested in seeing the effects (screenshot).

          Show
          Eric Newton added a comment - Jonathan Park , I'm very interested in seeing the effects (screenshot).
          Hide
          Jonathan Park added a comment -

          Results from performance tests:

          Test design:

          • Run continuous ingest with 4 ingesters each ingesting 25million entries and then measure time until completion
          • We varied # of minor compactors and tablets per server (in retrospect, # of minor compactors didn't really matter in these tests, it may have been better to vary # of clients).
          • Each trial was run 3x and the average was taken.

          Tests were run on a single node (24 logical cores, 64 GB RAM, 8 drives)

          minc tablets/server w/o patch(ms) w/ patch(ms) ratio
          4 32 269790.33 257537.33 0.95458325
          12 32 271124.33 255952 0.94403922
          12 320 355962.67 323737 0.90946896
          24 32 268709 261362.67 0.97266065
          24 320 355182.33 324308.67 0.91307659

          I'll try to run this on a multi-node cluster if I can get around to it.

          Show
          Jonathan Park added a comment - Results from performance tests: Test design: Run continuous ingest with 4 ingesters each ingesting 25million entries and then measure time until completion We varied # of minor compactors and tablets per server (in retrospect, # of minor compactors didn't really matter in these tests, it may have been better to vary # of clients). Each trial was run 3x and the average was taken. Tests were run on a single node (24 logical cores, 64 GB RAM, 8 drives) minc tablets/server w/o patch(ms) w/ patch(ms) ratio 4 32 269790.33 257537.33 0.95458325 12 32 271124.33 255952 0.94403922 12 320 355962.67 323737 0.90946896 24 32 268709 261362.67 0.97266065 24 320 355182.33 324308.67 0.91307659 I'll try to run this on a multi-node cluster if I can get around to it.
          Hide
          Josh Elser added a comment -

          I've been looking over the patch after seeing the lower performance numbers than I would have hoped to see.

          I'm wary of using the ThreadLocal as a way to synchronize updates. I don't think we're getting as much batching as we could because the tabletserver is going to be running multiple clients in their own threads. You'll only get batching when a single write session writes to multiple tablets on that one tserver. I think trying to synchronize access around the InternalBatchWriter class (or use a concurrent data structure as the queue) might be cleaner to understand and would cache across different threads. Switching to a concurrent data structure would also need some extra synchronization around commitBatch() as you'd have a race condition on clearing the data structure after a flush() on the underlying batchwriter.

          Actually, looking at ThriftClientHandler.applyUpdates(TInfo, long, TKeyExtent, List<TMutation>), I'm not sure you're going to be getting any batching at all. The tserver is only receiving updates for a single tablet in one thrift call (a thread) which means that all writes to multiple tablets are in different threads. I could be missing something, but that might drive home the point.

          Ideally, you'd want something that can very quickly (maybe even lock free somehow?) add Mutations that need to be flush()'ed and then get a single notification point that all of the threads could wait on to know that the sync happened (a CountdownLatch, perhaps).

          Show
          Josh Elser added a comment - I've been looking over the patch after seeing the lower performance numbers than I would have hoped to see. I'm wary of using the ThreadLocal as a way to synchronize updates. I don't think we're getting as much batching as we could because the tabletserver is going to be running multiple clients in their own threads. You'll only get batching when a single write session writes to multiple tablets on that one tserver. I think trying to synchronize access around the InternalBatchWriter class (or use a concurrent data structure as the queue) might be cleaner to understand and would cache across different threads. Switching to a concurrent data structure would also need some extra synchronization around commitBatch() as you'd have a race condition on clearing the data structure after a flush() on the underlying batchwriter. Actually, looking at ThriftClientHandler.applyUpdates(TInfo, long, TKeyExtent, List<TMutation>) , I'm not sure you're going to be getting any batching at all. The tserver is only receiving updates for a single tablet in one thrift call (a thread) which means that all writes to multiple tablets are in different threads. I could be missing something, but that might drive home the point. Ideally, you'd want something that can very quickly (maybe even lock free somehow?) add Mutations that need to be flush() 'ed and then get a single notification point that all of the threads could wait on to know that the sync happened (a CountdownLatch, perhaps).
          Hide
          Eric Newton added a comment -

          We could just add all the hosted tablets to the log when it is new (common), and when new tablets are hosted (rare). This would cause unnecessary recoveries in the case where a tablet is flushed or isn't otherwise using the log.

          Show
          Eric Newton added a comment - We could just add all the hosted tablets to the log when it is new (common), and when new tablets are hosted (rare). This would cause unnecessary recoveries in the case where a tablet is flushed or isn't otherwise using the log.
          Hide
          Keith Turner added a comment -

          The calls to ThriftClientHandler.applyUpdates(TInfo, long, TKeyExtent, List<TMutation>) are async thrift calls. The client makes the calls against a sessionid. On the tserver side it buffers data for that sessionid (periodically dumping to walog and in-mem map depending on memory use)

          For Accismus I wrote a little wrapper that allows mulitple threads to submit mutations to a batch writer and flush. Something like this could possibly be used for what Josh Elserj is suggesting for allowing multiple threads to batch writes to metadata. This wrapper is a WIP

          https://github.com/keith-turner/Accismus/blob/b06dcdd4494901c81d9bc3e8be73830fdc88d377/modules/core/src/main/java/accismus/impl/SharedBatchWriter.java

          For my purposes this is still not where I want it to be, because a threads has to wait until all other threads mutations are flushed. Ideally once its mutations are flushed, it can continue. For batching writes to the metadata table, this case would matter when the metadata table is split into many tablets.

          Show
          Keith Turner added a comment - The calls to ThriftClientHandler.applyUpdates(TInfo, long, TKeyExtent, List<TMutation>) are async thrift calls. The client makes the calls against a sessionid. On the tserver side it buffers data for that sessionid (periodically dumping to walog and in-mem map depending on memory use) For Accismus I wrote a little wrapper that allows mulitple threads to submit mutations to a batch writer and flush. Something like this could possibly be used for what Josh Elser j is suggesting for allowing multiple threads to batch writes to metadata. This wrapper is a WIP https://github.com/keith-turner/Accismus/blob/b06dcdd4494901c81d9bc3e8be73830fdc88d377/modules/core/src/main/java/accismus/impl/SharedBatchWriter.java For my purposes this is still not where I want it to be, because a threads has to wait until all other threads mutations are flushed. Ideally once its mutations are flushed, it can continue. For batching writes to the metadata table, this case would matter when the metadata table is split into many tablets.
          Hide
          Josh Elser added a comment -

          We could just add all the hosted tablets to the log when it is new (common), and when new tablets are hosted (rare).

          Could also try to be a little smarter and keep some basic measure of "recently written to" tablets and update the log entry there instead of all tablets. That would help the write-once, read-many case that's common.

          This would cause unnecessary recoveries in the case where a tablet is flushed or isn't otherwise using the log.

          I'm not entirely clear on what performing log recovery for a tablet using a WAL that doesn't have any updates for that tablet entails. Would we just be sorting and re-reading that WAL to not find any updates that need to be replayed? Do you think we would incur a noticeable penalty for doing this?

          Show
          Josh Elser added a comment - We could just add all the hosted tablets to the log when it is new (common), and when new tablets are hosted (rare). Could also try to be a little smarter and keep some basic measure of "recently written to" tablets and update the log entry there instead of all tablets. That would help the write-once, read-many case that's common. This would cause unnecessary recoveries in the case where a tablet is flushed or isn't otherwise using the log. I'm not entirely clear on what performing log recovery for a tablet using a WAL that doesn't have any updates for that tablet entails. Would we just be sorting and re-reading that WAL to not find any updates that need to be replayed? Do you think we would incur a noticeable penalty for doing this?
          Hide
          Eric Newton added a comment -

          Unless every tablet isn't using the WALog, it's going to need to be sorted anyhow. The (merged) read of the sorted results should only look at the DEFINE and MinorCompaction markers, which are all in the beginning of the sorted results. I think this would be very efficient, but some testing would tell us for sure.

          Show
          Eric Newton added a comment - Unless every tablet isn't using the WALog, it's going to need to be sorted anyhow. The (merged) read of the sorted results should only look at the DEFINE and MinorCompaction markers, which are all in the beginning of the sorted results. I think this would be very efficient, but some testing would tell us for sure.
          Hide
          Josh Elser added a comment -

          The calls to ThriftClientHandler.applyUpdates(TInfo, long, TKeyExtent, List<TMutation>) are async thrift calls. The client makes the calls against a sessionid. On the tserver side it buffers data for that sessionid (periodically dumping to walog and in-mem map depending on memory use)

          But since those are separate thrift calls, they'll all be done in their own thread, right? That's the only point I was trying to make. I wouldn't think that async thrift calls are handled outside of the normal threadpool, but I haven't read the code either.

          The SharedBatchWriter is (scarily) exactly what I was thinking about. Get out of my head, Keith Turner.

          Show
          Josh Elser added a comment - The calls to ThriftClientHandler.applyUpdates(TInfo, long, TKeyExtent, List<TMutation>) are async thrift calls. The client makes the calls against a sessionid. On the tserver side it buffers data for that sessionid (periodically dumping to walog and in-mem map depending on memory use) But since those are separate thrift calls, they'll all be done in their own thread, right? That's the only point I was trying to make. I wouldn't think that async thrift calls are handled outside of the normal threadpool, but I haven't read the code either. The SharedBatchWriter is (scarily) exactly what I was thinking about. Get out of my head, Keith Turner .
          Hide
          Keith Turner added a comment -

          But since those are separate thrift calls, they'll all be done in their own thread, right?

          No, one thread will take all of the tablet updates for a tablet server and repeatedly call that method. For the gory details, see sendMutationsToTabletServer() in TabletServerBatchWriter (there is a special case for a single mutation, skip past that).

          Show
          Keith Turner added a comment - But since those are separate thrift calls, they'll all be done in their own thread, right? No, one thread will take all of the tablet updates for a tablet server and repeatedly call that method. For the gory details, see sendMutationsToTabletServer() in TabletServerBatchWriter (there is a special case for a single mutation, skip past that).
          Hide
          Jonathan Park added a comment -

          I'll gather a new set of #s when I get access to a cluster of machines.

          Show
          Jonathan Park added a comment - I'll gather a new set of #s when I get access to a cluster of machines.

            People

            • Assignee:
              Jonathan Park
              Reporter:
              Jonathan Park
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:

                Development