diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d5e2e86..ef3801b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -317,6 +317,11 @@ public class HRegion implements HeapSize { // , Writable{ // purge timeout, when a RPC call will be terminated by the RPC engine. final long maxBusyWaitDuration; + // bulkload acquires the write lock, and waiting a long time for write lock can block other + // read/update operations, so we need another param to tune the wait duration of bulkload, and it + // can be configured by table + final long bulkloadWaitDuration; + // negative number indicates infinite timeout static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L; final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool(); @@ -620,6 +625,8 @@ public class HRegion implements HeapSize { // , Writable{ this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout", conf.getLong("ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.bulkloadWaitDuration = conf.getLong("hbase.bulkload.wait.duration", -1); + /* * timestamp.slop provides a server-side constraint on the timestamp. This * assumes that you base your TS around currentTimeMillis(). In this case, @@ -3782,7 +3789,7 @@ public class HRegion implements HeapSize { // , Writable{ BulkLoadListener bulkLoadListener) throws IOException { Preconditions.checkNotNull(familyPaths); // we need writeLock for multi-family bulk load - startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths)); + startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths), true); try { this.writeRequestsCount.increment(); @@ -6172,10 +6179,18 @@ public class HRegion implements HeapSize { // , Writable{ */ private void startBulkRegionOperation(boolean writeLockNeeded) throws NotServingRegionException, RegionTooBusyException, InterruptedIOException { + startBulkRegionOperation(writeLockNeeded, false); + } + + /** + * Bulkload use this api to tune the wait duration + */ + private void startBulkRegionOperation(boolean writeLockNeeded, boolean isBulkload) + throws NotServingRegionException, RegionTooBusyException, InterruptedIOException { if (this.closing.get()) { throw new NotServingRegionException(getRegionNameAsString() + " is closing"); } - if (writeLockNeeded) lock(lock.writeLock()); + if (writeLockNeeded) lock(lock.writeLock(), isBulkload); else lock(lock.readLock()); if (this.closed.get()) { if (writeLockNeeded) lock.writeLock().unlock(); @@ -6230,9 +6245,23 @@ public class HRegion implements HeapSize { // , Writable{ */ private void lock(final Lock lock, final int multiplier) throws RegionTooBusyException, InterruptedIOException { + lock(lock, multiplier, false); + } + + private void lock(final Lock lock, final boolean isBulkload) throws RegionTooBusyException, + InterruptedIOException { + lock(lock, 1, isBulkload); + } + + private void lock(final Lock lock, final int multiplier, final boolean isBulkload) + throws RegionTooBusyException, InterruptedIOException { try { - final long waitTime = Math.min(maxBusyWaitDuration, - busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier)); + long generalWaitTime = + Math.min(maxBusyWaitDuration, + busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier)); + final long waitTime = + (isBulkload && (0 < this.bulkloadWaitDuration)) ? this.bulkloadWaitDuration + : generalWaitTime; if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) { throw new RegionTooBusyException( "failed to get a lock in " + waitTime + " ms. " +