HBase
  1. HBase
  2. HBASE-3767

Improve how HTable handles threads used for multi actions

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.90.2
    • Fix Version/s: 0.90.3
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Reviewed
    • Release Note:
      hbase.htable.threads.max now defaults to Integer.MAX_VALUE. Setting this below the number of region servers can result into RejectedExecutionException.
    • Tags:
      noob

      Description

      When creating a new HTable we have to query ZK to learn about the number of region servers in the cluster. That is done for every single one of them, I think instead we should do it once per JVM and then reuse that number for all the others.

      1. HBASE-3767.patch
        3 kB
        Jean-Daniel Cryans
      2. HBASE-3767-v2.patch
        7 kB
        Jean-Daniel Cryans

        Issue Links

          Activity

          Hide
          Jean-Daniel Cryans added a comment -

          Yeah, maybe deprecate it first and print a WARN message then eventually remove.

          Show
          Jean-Daniel Cryans added a comment - Yeah, maybe deprecate it first and print a WARN message then eventually remove.
          Hide
          Nicolas Spiegelberg added a comment -

          @JD: From your description of the SynchronousQueue

          Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks.
          

          It seems like we should either add rejected task handling or not allow the user to specify "hbase.htable.threads.max" anymore, correct?

          Show
          Nicolas Spiegelberg added a comment - @JD: From your description of the SynchronousQueue Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. It seems like we should either add rejected task handling or not allow the user to specify "hbase.htable.threads.max" anymore, correct?
          Hide
          Dave Latham added a comment -

          That was quick, thanks!

          Show
          Dave Latham added a comment - That was quick, thanks!
          Hide
          Jean-Daniel Cryans added a comment -

          Yep, Ted noticed that earlier and I just committed the fix.

          For 0.90 I just readded the method in HTable.

          For trunk I moved it to HConnection instead.

          Show
          Jean-Daniel Cryans added a comment - Yep, Ted noticed that earlier and I just committed the fix. For 0.90 I just readded the method in HTable. For trunk I moved it to HConnection instead.
          Hide
          Dave Latham added a comment -

          RegionSplitter and TestAdmin both reference HTable.getCurrentNrHRS() which this patch has removed.

          Show
          Dave Latham added a comment - RegionSplitter and TestAdmin both reference HTable.getCurrentNrHRS() which this patch has removed.
          Hide
          Jean-Daniel Cryans added a comment -

          Committed to branch and trunk, thanks for the review Stack.

          Show
          Jean-Daniel Cryans added a comment - Committed to branch and trunk, thanks for the review Stack.
          Hide
          stack added a comment -

          +1

          Show
          stack added a comment - +1
          Hide
          Jean-Daniel Cryans added a comment -

          This patch does two thing on top of the last one:

          • It adds a package protected method to get access to the pool.
          • It adds a new test to verify the behavior of the TPE. The method's comments describe what it does.
          Show
          Jean-Daniel Cryans added a comment - This patch does two thing on top of the last one: It adds a package protected method to get access to the pool. It adds a new test to verify the behavior of the TPE. The method's comments describe what it does.
          Hide
          stack added a comment -

          Patch looks good to me. Would feel more comfortable if there was a unit test proving the pool works as we expect (the code pasted above by Ted only provokes to me to ask questions about how it actually works).

          Show
          stack added a comment - Patch looks good to me. Would feel more comfortable if there was a unit test proving the pool works as we expect (the code pasted above by Ted only provokes to me to ask questions about how it actually works).
          Hide
          Ted Yu added a comment -

          Re-pasting source code due to garbled display above:

           497:                 switch (runState) {
           498:                 case RUNNING: {
           499:                     // untimed wait if core and not allowing core timeout
           500:                     if (poolSize <= corePoolSize && !allowCoreThreadTimeOut)
           501:                         return workQueue.take();
           502: 
           503:                     long timeout = keepAliveTime;
           504:                     if (timeout <= 0) // die immediately for 0 timeout
           505:                         return null;
           506:                     Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
           507:                     if (r != null)
           508:                         return r;
           509:                     if (poolSize > corePoolSize || allowCoreThreadTimeOut)
           510:                         return null; // timed out
           511:                     // Else, after timeout, the pool shrank. Retry
           512:                     break;
           513:                 }
          
          Show
          Ted Yu added a comment - Re-pasting source code due to garbled display above: 497: switch (runState) { 498: case RUNNING: { 499: // untimed wait if core and not allowing core timeout 500: if (poolSize <= corePoolSize && !allowCoreThreadTimeOut) 501: return workQueue.take(); 502: 503: long timeout = keepAliveTime; 504: if (timeout <= 0) // die immediately for 0 timeout 505: return null ; 506: Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS); 507: if (r != null ) 508: return r; 509: if (poolSize > corePoolSize || allowCoreThreadTimeOut) 510: return null ; // timed out 511: // Else, after timeout, the pool shrank. Retry 512: break ; 513: }
          Hide
          Ted Yu added a comment -

          The javadoc fragment doesn't mention allowCoreThreadTimeOut.

          From http://fuseyism.com/classpath/doc/java/util/concurrent/ThreadPoolExecutor-source.html:

          
          

          494: Runnable getTask() {
          495: for (; {
          496: try {
          497: switch (runState) {
          498: case RUNNING:

          { 499: // untimed wait if core and not allowing core timeout 500: if (poolSize <= corePoolSize && !allowCoreThreadTimeOut) 501: return workQueue.take(); 502: 503: long timeout = keepAliveTime; 504: if (timeout <= 0) // die immediately for 0 timeout 505: return null; 506: Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS); 507: if (r != null) 508: return r; 509: if (poolSize > corePoolSize || allowCoreThreadTimeOut) 510: return null; // timed out 511: // Else, after timeout, the pool shrank. Retry 512: break; 513: }
          
          

          In HTable(), allowCoreThreadTimeOut is set to true. So we're not bounded by corePoolSize threads.

          Show
          Ted Yu added a comment - The javadoc fragment doesn't mention allowCoreThreadTimeOut. From http://fuseyism.com/classpath/doc/java/util/concurrent/ThreadPoolExecutor-source.html: 494: Runnable getTask() { 495: for (; { 496: try { 497: switch (runState) { 498: case RUNNING: { 499: // untimed wait if core and not allowing core timeout 500: if (poolSize <= corePoolSize && !allowCoreThreadTimeOut) 501: return workQueue.take(); 502: 503: long timeout = keepAliveTime; 504: if (timeout <= 0) // die immediately for 0 timeout 505: return null; 506: Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS); 507: if (r != null) 508: return r; 509: if (poolSize > corePoolSize || allowCoreThreadTimeOut) 510: return null; // timed out 511: // Else, after timeout, the pool shrank. Retry 512: break; 513: } In HTable(), allowCoreThreadTimeOut is set to true. So we're not bounded by corePoolSize threads.
          Hide
          Jean-Daniel Cryans added a comment -

          So the current way we handle the TPE is called "unbounded queues", from the javadoc:

          Unbounded queues. Using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.) This may be appropriate when each task is completely independent of others, so tasks cannot affect each others execution; for example, in a web page server. While this style of queuing can be useful in smoothing out transient bursts of requests, it admits the possibility of unbounded work queue growth when commands continue to arrive on average faster than they can be processed.

          The important part is that no more than corePoolSize threads will ever be created, maxPoolSize isn't used, and the rest is just queued. This is why it's important in that context to know the number of region servers since you want maximum parallelism.

          Instead, using the "direct handoff" strategy, new threads are created as soon as they start being queued meaning that the number of threads will go up to the number of region servers naturally, even if it changes. From the javadoc:

          Direct handoffs. A good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them. Here, an attempt to queue a task will fail if no threads are immediately available to run it, so a new thread will be constructed. This policy avoids lockups when handling sets of requests that might have internal dependencies. Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed.

          We will never suffer from what is described in that last sentence since HCM will only create as many Runnables as there are RS that contain the regions that we need to talk to.

          Show
          Jean-Daniel Cryans added a comment - So the current way we handle the TPE is called "unbounded queues", from the javadoc: Unbounded queues. Using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.) This may be appropriate when each task is completely independent of others, so tasks cannot affect each others execution; for example, in a web page server. While this style of queuing can be useful in smoothing out transient bursts of requests, it admits the possibility of unbounded work queue growth when commands continue to arrive on average faster than they can be processed. The important part is that no more than corePoolSize threads will ever be created, maxPoolSize isn't used, and the rest is just queued. This is why it's important in that context to know the number of region servers since you want maximum parallelism. Instead, using the "direct handoff" strategy, new threads are created as soon as they start being queued meaning that the number of threads will go up to the number of region servers naturally, even if it changes. From the javadoc: Direct handoffs. A good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them. Here, an attempt to queue a task will fail if no threads are immediately available to run it, so a new thread will be constructed. This policy avoids lockups when handling sets of requests that might have internal dependencies. Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed. We will never suffer from what is described in that last sentence since HCM will only create as many Runnables as there are RS that contain the regions that we need to talk to.
          Hide
          Jean-Daniel Cryans added a comment -

          My expectation is that in a long living JVM the HTables are created early rather than later, for example with our Thrift servers once you served each table from each thread you already got all your HTables.

          In the case of a bulk upload, the processes are usually MR tasks and are short lived and the HTable is created up front.

          If we had oversized TPE it'd grow as servers grew.

          It'd actually prefer that to setting the max to CPU times some number. Default max to 1000 and lower bound at 1?

          Aside: Whose idea was the passing of an ExecutorService from HTable down into HCM for it to use? Thats a little perverse

          Your favorite Canadian, guess which one

          Show
          Jean-Daniel Cryans added a comment - My expectation is that in a long living JVM the HTables are created early rather than later, for example with our Thrift servers once you served each table from each thread you already got all your HTables. In the case of a bulk upload, the processes are usually MR tasks and are short lived and the HTable is created up front. If we had oversized TPE it'd grow as servers grew. It'd actually prefer that to setting the max to CPU times some number. Default max to 1000 and lower bound at 1? Aside: Whose idea was the passing of an ExecutorService from HTable down into HCM for it to use? Thats a little perverse Your favorite Canadian, guess which one
          Hide
          stack added a comment -

          If the HTable is already created, currently you still have just 1 thread in the TPE.

          But because we are not caching, the next HTable creation will have a right-sized TPE.

          If we had oversized TPE it'd grow as servers grew.

          Aside: Whose idea was the passing of an ExecutorService from HTable down into HCM for it to use? Thats a little perverse

          Show
          stack added a comment - If the HTable is already created, currently you still have just 1 thread in the TPE. But because we are not caching, the next HTable creation will have a right-sized TPE. If we had oversized TPE it'd grow as servers grew. Aside: Whose idea was the passing of an ExecutorService from HTable down into HCM for it to use? Thats a little perverse
          Hide
          Jean-Daniel Cryans added a comment -

          So if we create an HTable with one RegionServer in the cluster and then add ten nodes, we'll have cached 1 rather than 10

          If the HTable is already created, currently you still have just 1 thread in the TPE.

          Show
          Jean-Daniel Cryans added a comment - So if we create an HTable with one RegionServer in the cluster and then add ten nodes, we'll have cached 1 rather than 10 If the HTable is already created, currently you still have just 1 thread in the TPE.
          Hide
          stack added a comment -

          So if we create an HTable with one RegionServer in the cluster and then add ten nodes, we'll have cached 1 rather than 10. That's going to be a pain to debug why upload is slow.

          Oversize the executor pool and have it shrink back down when unused (as per Ted above)?

          Show
          stack added a comment - So if we create an HTable with one RegionServer in the cluster and then add ten nodes, we'll have cached 1 rather than 10. That's going to be a pain to debug why upload is slow. Oversize the executor pool and have it shrink back down when unused (as per Ted above)?
          Hide
          Nicolas Spiegelberg added a comment -

          We were talking about this exact issue last week. I think caching the RS count per cluster is the correct way to go. I hacked together a Map<Configuration, Integer> but never finished it

          Show
          Nicolas Spiegelberg added a comment - We were talking about this exact issue last week. I think caching the RS count per cluster is the correct way to go. I hacked together a Map<Configuration, Integer> but never finished it
          Hide
          Jean-Daniel Cryans added a comment -

          @Ted, I guess the improvement provided by setting the core pool size is minimal... I think I wouldn't even bother going all the way to using the number of CPUs and just start with 1. Cleaner code.

          Show
          Jean-Daniel Cryans added a comment - @Ted, I guess the improvement provided by setting the core pool size is minimal... I think I wouldn't even bother going all the way to using the number of CPUs and just start with 1. Cleaner code.
          Hide
          Ted Yu added a comment -

          We can set core pool size to be the number of available processors and set max pool size to be (large) multiple of the number of available processors.
          ThreadPoolExecutor is able to dynamically shrink thread count when appropriate.

          Show
          Ted Yu added a comment - We can set core pool size to be the number of available processors and set max pool size to be (large) multiple of the number of available processors. ThreadPoolExecutor is able to dynamically shrink thread count when appropriate.
          Hide
          Jean-Daniel Cryans added a comment -

          And if the number of region servers changes, are there repercussions?

          Currently once the HTable is created its ThreadPoolExecutor will stay the same size disregard the changing number of region servers. Caching it here has the same behavior. Where it changes is if a HTable is created later after the number of region server changes, but running with less threads than the total number of region server is only less efficient under bulk load situations where you need to insert into all of them at the same time (which I believe isn't frequent when uploading, usually you create the HTables up front). That's the only repercussion I see, and it's still less bad than the following:

          Thats better than doing getCurrentNrHRS. Maybe 2* number of processors

          So the reason we use the number of RS is to be able to insert into all the region servers at the same time in a bulk upload case. Using the number of CPUs by itself isn't particularly useful since uploading isn't CPU intensive on the client (it's just threads waiting on region servers) and the fact that you usually have many HTables per JVM kinda defeats the purpose of limiting the number of executors.

          I personally like the fact that we try to learn how many RS there is in order to tune the TPE, but it's just that calling it every time is rather expensive and mostly useless. I still believe we should just cache it.

          Show
          Jean-Daniel Cryans added a comment - And if the number of region servers changes, are there repercussions? Currently once the HTable is created its ThreadPoolExecutor will stay the same size disregard the changing number of region servers. Caching it here has the same behavior. Where it changes is if a HTable is created later after the number of region server changes, but running with less threads than the total number of region server is only less efficient under bulk load situations where you need to insert into all of them at the same time (which I believe isn't frequent when uploading, usually you create the HTables up front). That's the only repercussion I see, and it's still less bad than the following: Thats better than doing getCurrentNrHRS. Maybe 2* number of processors So the reason we use the number of RS is to be able to insert into all the region servers at the same time in a bulk upload case. Using the number of CPUs by itself isn't particularly useful since uploading isn't CPU intensive on the client (it's just threads waiting on region servers) and the fact that you usually have many HTables per JVM kinda defeats the purpose of limiting the number of executors. I personally like the fact that we try to learn how many RS there is in order to tune the TPE, but it's just that calling it every time is rather expensive and mostly useless. I still believe we should just cache it.
          Hide
          Ted Yu added a comment -

          The following call would end up in native code and give us the answer:

          Runtime.getRuntime().availableProcessors()
          
          Show
          Ted Yu added a comment - The following call would end up in native code and give us the answer: Runtime .getRuntime().availableProcessors()
          Hide
          stack added a comment -

          Can we default the value for hbase.htable.threads.max using a multiple of the available processors ?

          Thats better than doing getCurrentNrHRS. Maybe 2* number of processors. We'd have to do a call outside of java to figure system characteristics?

          Show
          stack added a comment - Can we default the value for hbase.htable.threads.max using a multiple of the available processors ? Thats better than doing getCurrentNrHRS. Maybe 2* number of processors. We'd have to do a call outside of java to figure system characteristics?
          Hide
          Ted Yu added a comment -

          The number of region servers in the cluster is used in the following:

              int nrThreads = conf.getInt("hbase.htable.threads.max", getCurrentNrHRS());
          ...
              this.pool = new ThreadPoolExecutor(nrThreads, nrThreads,
          

          Can we default the value for hbase.htable.threads.max using a multiple of the available processors ?

          Show
          Ted Yu added a comment - The number of region servers in the cluster is used in the following: int nrThreads = conf.getInt( "hbase.htable.threads.max" , getCurrentNrHRS()); ... this .pool = new ThreadPoolExecutor(nrThreads, nrThreads, Can we default the value for hbase.htable.threads.max using a multiple of the available processors ?
          Hide
          stack added a comment -

          And if the number of region servers changes, are there repercussions?

          I wonder why we need to know about regionservers at all on construction? Can we not learn about them lazily as Client figures it needs a region on said host?

          Show
          stack added a comment - And if the number of region servers changes, are there repercussions? I wonder why we need to know about regionservers at all on construction? Can we not learn about them lazily as Client figures it needs a region on said host?

            People

            • Assignee:
              Jean-Daniel Cryans
              Reporter:
              Jean-Daniel Cryans
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development