From 85c44433e14e65d88c0a2403c18a1c304c62a1b6 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Fri, 9 Jan 2015 19:33:30 -0800 Subject: [PATCH] HBASE-12729 Backport HBASE-5162 (Basic client pushback mechanism) to 0.98 --- .../apache/hadoop/hbase/client/AsyncProcess.java | 161 ++-- .../apache/hadoop/hbase/client/ClientScanner.java | 5 +- .../hadoop/hbase/client/ClientSmallScanner.java | 4 +- .../apache/hadoop/hbase/client/DelayingRunner.java | 117 +++ .../org/apache/hadoop/hbase/client/HBaseAdmin.java | 4 +- .../hadoop/hbase/client/HConnectionManager.java | 32 +- .../org/apache/hadoop/hbase/client/HTable.java | 4 +- .../hadoop/hbase/client/HTableMultiplexer.java | 6 +- .../apache/hadoop/hbase/client/MultiAction.java | 17 + .../org/apache/hadoop/hbase/client/Result.java | 17 + .../hadoop/hbase/client/ResultStatsUtil.java | 76 ++ .../hadoop/hbase/client/RpcRetryingCaller.java | 4 +- .../hbase/client/RpcRetryingCallerFactory.java | 41 +- .../hbase/client/ServerStatisticTracker.java | 74 ++ .../hadoop/hbase/client/StatisticsHConnection.java | 39 + .../client/StatsTrackingRpcRetryingCaller.java | 71 ++ .../hbase/client/backoff/ClientBackoffPolicy.java | 42 ++ .../client/backoff/ClientBackoffPolicyFactory.java | 59 ++ .../backoff/ExponentialClientBackoffPolicy.java | 71 ++ .../hbase/client/backoff/ServerStatistics.java | 68 ++ .../hadoop/hbase/protobuf/ResponseConverter.java | 19 +- .../hbase/client/TestClientExponentialBackoff.java | 110 +++ .../java/org/apache/hadoop/hbase/HConstants.java | 5 + .../hbase/protobuf/generated/ClientProtos.java | 808 ++++++++++++++++++++- .../hbase/protobuf/generated/MasterProtos.java | 2 + .../protobuf/generated/VisibilityLabelsProtos.java | 6 +- hbase-protocol/src/main/protobuf/Client.proto | 10 + .../hadoop/hbase/client/TestClientPushback.java | 96 +++ .../hbase/mapreduce/LoadIncrementalHFiles.java | 2 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 33 +- .../hadoop/hbase/regionserver/HRegionServer.java | 25 +- .../hbase/regionserver/wal/WALEditsReplaySink.java | 5 +- 32 files changed, 1915 insertions(+), 118 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticsHConnection.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/client/TestClientPushback.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index d206c71..d42a697 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -539,57 +540,133 @@ class AsyncProcess { // Send the queries and add them to the inProgress list // This iteration is by server (the HRegionLocation comparator is by server portion only). for (Map.Entry> e : actionsByServer.entrySet()) { - final HRegionLocation loc = e.getKey(); - final MultiAction multiAction = e.getValue(); - incTaskCounters(multiAction.getRegions(), loc.getServerName()); - Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() { - @Override - public void run() { - MultiResponse res; + HRegionLocation loc = e.getKey(); + MultiAction multiAction = e.getValue(); + Collection runnables = getNewMultiActionRunnable(initialActions, loc, + multiAction, numAttempt, errorsByServer); + for (Runnable runnable: runnables) { + try { + incTaskCounters(multiAction.getRegions(), loc.getServerName()); + this.pool.submit(runnable); + } catch (RejectedExecutionException ree) { + // This should never happen. But as the pool is provided by the end user, let's secure + // this a little. + decTaskCounters(multiAction.getRegions(), loc.getServerName()); + LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." + + " Server is " + loc.getServerName(), ree); + // We're likely to fail again, but this will increment the attempt counter, so it will + // finish. + receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, ree, errorsByServer); + } + } + } + } + + private Runnable getNewSingleServerRunnable( + final List> initialActions, + final HRegionLocation loc, + final MultiAction multiAction, + final int numAttempt, + final HConnectionManager.ServerErrorTracker errorsByServer) { + return new Runnable() { + @Override + public void run() { + MultiResponse res; + try { + MultiServerCallable callable = createCallable(loc, multiAction); try { - MultiServerCallable callable = createCallable(loc, multiAction); - try { - res = createCaller(callable).callWithoutRetries(callable, timeout); - } catch (IOException e) { - // The service itself failed . It may be an error coming from the communication - // layer, but, as well, a functional error raised by the server. - receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, e, - errorsByServer); - return; - } catch (Throwable t) { - // This should not happen. Let's log & retry anyway. - LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." + - " Retrying. Server is " + loc.getServerName() + ", tableName=" + tableName, t); - receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, t, - errorsByServer); - return; - } + res = createCaller(callable).callWithoutRetries(callable, timeout); + } catch (IOException e) { + // The service itself failed . It may be an error coming from the communication + // layer, but, as well, a functional error raised by the server. + receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, e, + errorsByServer); + return; + } catch (Throwable t) { + // This should not happen. Let's log & retry anyway. + LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." + + " Retrying. Server is " + loc.getServerName() + ", tableName=" + tableName, t); + receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, t, + errorsByServer); + return; + } - // Nominal case: we received an answer from the server, and it's not an exception. - receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer); + // Nominal case: we received an answer from the server, and it's not an exception. + receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer); - } finally { - decTaskCounters(multiAction.getRegions(), loc.getServerName()); - } + } finally { + decTaskCounters(multiAction.getRegions(), loc.getServerName()); } - }); - - try { - this.pool.submit(runnable); - } catch (RejectedExecutionException ree) { - // This should never happen. But as the pool is provided by the end user, let's secure - // this a little. - decTaskCounters(multiAction.getRegions(), loc.getServerName()); - LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." + - " Server is " + loc.getServerName(), ree); - // We're likely to fail again, but this will increment the attempt counter, so it will - // finish. - receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, ree, errorsByServer); } + }; + } + + private Collection getNewMultiActionRunnable( + final List> initialActions, + final HRegionLocation loc, + final MultiAction multiAction, + final int numAttempt, + final HConnectionManager.ServerErrorTracker errorsByServer) { + // no stats to manage, just do the standard action + if (!(AsyncProcess.this.hConnection instanceof StatisticsHConnection) || + ((StatisticsHConnection)AsyncProcess.this.hConnection).getStatisticsTracker() == null) { + List toReturn = new ArrayList(1); + toReturn.add(Trace.wrap("AsyncProcess.sendMultiAction", + getNewSingleServerRunnable(initialActions, loc, multiAction, numAttempt, + errorsByServer))); + return toReturn; + } else { + // group the actions by the amount of delay + Map actions = new HashMap(multiAction + .size()); + + // split up the actions + for (Map.Entry>> e : multiAction.actions.entrySet()) { + Long backoff = getBackoff(loc); + DelayingRunner runner = actions.get(backoff); + if (runner == null) { + actions.put(backoff, new DelayingRunner(backoff, e)); + } else { + runner.add(e); + } + } + + List toReturn = new ArrayList(actions.size()); + for (DelayingRunner runner : actions.values()) { + String traceText = "AsyncProcess.sendMultiAction"; + Runnable runnable = getNewSingleServerRunnable(initialActions, loc, runner.getActions(), + numAttempt, errorsByServer); + // use a delay runner only if we need to sleep for some time + if (runner.getSleepTime() > 0) { + runner.setRunner(runnable); + traceText = "AsyncProcess.clientBackoff.sendMultiAction"; + runnable = runner; + } + runnable = Trace.wrap(traceText, runnable); + toReturn.add(runnable); + } + return toReturn; } } /** + * @param server server location where the target region is hosted + * @param regionName name of the region which we are going to write some data + * @return the amount of time the client should wait until it submit a request to the + * specified server and region + */ + private Long getBackoff(HRegionLocation location) { + Preconditions.checkState(AsyncProcess.this.hConnection instanceof StatisticsHConnection, + "AsyncProcess connection should be a StatisticsHConnection"); + ServerStatisticTracker tracker = ((StatisticsHConnection)AsyncProcess.this.hConnection) + .getStatisticsTracker(); + ServerStatistics stats = tracker.getStats(location.getServerName()); + return ((StatisticsHConnection)AsyncProcess.this.hConnection).getBackoffPolicy() + .getBackoffTime(location.getServerName(), location.getRegionInfo().getRegionName(), + stats); + } + + /** * Create a callable. Isolated to be easily overridden in the tests. */ protected MultiServerCallable createCallable(final HRegionLocation location, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index bea2a1b..7821f46 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -107,7 +107,10 @@ public class ClientScanner extends AbstractClientScanner { */ public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, HConnection connection) throws IOException { - this(conf, scan, tableName, connection, RpcRetryingCallerFactory.instantiate(conf), + this(conf, scan, tableName, connection, + RpcRetryingCallerFactory.instantiate(conf, + connection instanceof StatisticsHConnection ? + ((StatisticsHConnection)connection).getStatisticsTracker() : null), RpcControllerFactory.instantiate(conf)); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index 31df3b0..ad9380f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -83,7 +83,9 @@ public class ClientSmallScanner extends ClientScanner { */ public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName, HConnection connection) throws IOException { - this(conf, scan, tableName, connection, RpcRetryingCallerFactory.instantiate(conf), + this(conf, scan, tableName, connection, RpcRetryingCallerFactory.instantiate(conf, + connection instanceof StatisticsHConnection ? + ((StatisticsHConnection)connection).getStatisticsTracker() : null), RpcControllerFactory.instantiate(conf)); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java new file mode 100644 index 0000000..22c0461 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * A wrapper for a runnable for a group of actions for a single regionserver. + *

+ * This can be used to build up the actions that should be taken and then + *

+ *

+ * This class exists to simulate using a ScheduledExecutorService with just a regular + * ExecutorService and Runnables. It is used for legacy reasons in the the client; this could + * only be removed if we change the expectations in HTable around the pool the client is able to + * pass in and even if we deprecate the current APIs would require keeping this class around + * for the interim to bridge between the legacy ExecutorServices and the scheduled pool. + *

+ */ +@InterfaceAudience.Private +public class DelayingRunner implements Runnable { + private static final Log LOG = LogFactory.getLog(DelayingRunner.class); + + private final Object sleepLock = new Object(); + private boolean triggerWake = false; + private long sleepTime; + private MultiAction actions = new MultiAction(); + private Runnable runnable; + + public DelayingRunner(long sleepTime, Entry>> e) { + this.sleepTime = sleepTime; + add(e); + } + + public void setRunner(Runnable runner) { + this.runnable = runner; + } + + @Override + public void run() { + if (!sleep()) { + LOG.warn( + "Interrupted while sleeping for expected sleep time " + sleepTime + " ms"); + } + //TODO maybe we should consider switching to a listenableFuture for the actual callable and + // then handling the results/errors as callbacks. That way we can decrement outstanding tasks + // even if we get interrupted here, but for now, we still need to run so we decrement the + // outstanding tasks + this.runnable.run(); + } + + /** + * Sleep for an expected amount of time. + *

+ * This is nearly a copy of what the Sleeper does, but with the ability to know if you + * got interrupted while sleeping. + *

+ * + * @return true if the sleep completely entirely successfully, + * but otherwise false if the sleep was interrupted. + */ + private boolean sleep() { + long now = EnvironmentEdgeManager.currentTimeMillis(); + long startTime = now; + long waitTime = sleepTime; + while (waitTime > 0) { + long woke = -1; + try { + synchronized (sleepLock) { + if (triggerWake) break; + sleepLock.wait(waitTime); + } + woke = EnvironmentEdgeManager.currentTimeMillis(); + } catch (InterruptedException iex) { + return false; + } + // Recalculate waitTime. + woke = (woke == -1) ? EnvironmentEdgeManager.currentTimeMillis() : woke; + waitTime = waitTime - (woke - startTime); + } + return true; + } + + public void add(Map.Entry>> e) { + actions.add(e.getKey(), e.getValue()); + } + + public MultiAction getActions() { + return actions; + } + + public long getSleepTime() { + return sleepTime; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index effa069..e3af5e0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -214,7 +214,9 @@ public class HBaseAdmin implements Abortable, Closeable { HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.retryLongerMultiplier = this.conf.getInt( "hbase.client.retries.longer.multiplier", 10); - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf, + connection instanceof StatisticsHConnection ? + ((StatisticsHConnection)connection).getStatisticsTracker() : null); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index da7a122..b06dc6f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -66,6 +66,8 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; @@ -592,7 +594,7 @@ public class HConnectionManager { @edu.umd.cs.findbugs.annotations.SuppressWarnings( value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION", justification="Access to the conncurrent hash map is under a lock so should be fine.") - public static class HConnectionImplementation implements HConnection, Closeable { + public static class HConnectionImplementation implements StatisticsHConnection, Closeable { static final Log LOG = LogFactory.getLog(HConnectionImplementation.class); private final long pause; private final int numTries; @@ -664,6 +666,11 @@ public class HConnectionManager { private RpcControllerFactory rpcControllerFactory; + // single tracker per connection + private final ServerStatisticTracker stats; + + private final ClientBackoffPolicy backoffPolicy; + /** * Cluster registry of basic info such as clusterid and meta region location. */ @@ -720,7 +727,7 @@ public class HConnectionManager { } } - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, this.stats); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); } @@ -760,13 +767,16 @@ public class HConnectionManager { this.nonceGenerator = new NoNonceGenerator(); } + this.stats = ServerStatisticTracker.create(conf); this.usePrefetch = conf.getBoolean(HConstants.HBASE_CLIENT_PREFETCH, HConstants.DEFAULT_HBASE_CLIENT_PREFETCH); this.prefetchRegionLimit = conf.getInt( HConstants.HBASE_CLIENT_PREFETCH_LIMIT, HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT); - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, this.stats); + this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); + } @Override @@ -2418,11 +2428,12 @@ public class HConnectionManager { // For tests. protected AsyncProcess createAsyncProcess(TableName tableName, ExecutorService pool, AsyncProcess.AsyncProcessCallback callback, Configuration conf) { - return new AsyncProcess(this, tableName, pool, callback, conf, - RpcRetryingCallerFactory.instantiate(conf), RpcControllerFactory.instantiate(conf)); + RpcControllerFactory controllerFactory = RpcControllerFactory.instantiate(conf); + RpcRetryingCallerFactory callerFactory = RpcRetryingCallerFactory.instantiate(conf, this.stats); + return new AsyncProcess(this, tableName, pool, callback, conf, callerFactory, + controllerFactory); } - /** * Fill the result array for the interfaces using it. */ @@ -2461,6 +2472,15 @@ public class HConnectionManager { } } + @Override + public ServerStatisticTracker getStatisticsTracker() { + return this.stats; + } + + @Override + public ClientBackoffPolicy getBackoffPolicy() { + return this.backoffPolicy; + } /* * Return the number of cached region for a table. It will only be called diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 1d8a037..9cc6d7b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -379,7 +379,9 @@ public class HTable implements HTableInterface { this.scannerCaching = tableConfiguration.getScannerCaching(); if (this.rpcCallerFactory == null) { - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration, + this.connection instanceof StatisticsHConnection ? + ((StatisticsHConnection)this.connection).getStatisticsTracker() : null); } if (this.rpcControllerFactory == null) { this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index 709e5fd..0fcaa03 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -406,14 +406,16 @@ public class HTableMultiplexer { private final ScheduledExecutorService executor; private final int maxRetryInQueue; private final AtomicInteger retryInQueue = new AtomicInteger(0); - + public FlushWorker(Configuration conf, HConnection conn, HRegionLocation addr, HTableMultiplexer multiplexer, int perRegionServerBufferQueueSize, ExecutorService pool, ScheduledExecutorService executor) { this.addr = addr; this.multiplexer = multiplexer; this.queue = new LinkedBlockingQueue(perRegionServerBufferQueueSize); - RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); + RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, + conn instanceof StatisticsHConnection ? + ((StatisticsHConnection)conn).getStatisticsTracker() : null); RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.ap = new AsyncProcess(conn, null, pool, this, conf, rpcCallerFactory, rpcControllerFactory); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java index e1cfc9a..0af351f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java @@ -76,6 +76,23 @@ public final class MultiAction { rsActions.add(a); } + /** + * Add an list of Actions to this container based on it's regionName. If the regionName + * is wrong, the initial execution will fail, but will be automatically + * retried after looking up the correct region. + * + * @param regionName + * @param a + */ + public void add(byte[] regionName, List> a) { + List> rsActions = actions.get(regionName); + if (rsActions == null) { + rsActions = new ArrayList>(a.size()); + actions.put(regionName, rsActions); + } + rsActions.addAll(a); + } + public void setNonceGroup(long nonceGroup) { this.nonceGroup = nonceGroup; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index b995e28..c220718 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Bytes; /** @@ -93,6 +94,7 @@ public class Result implements CellScannable, CellScanner { * Index for where we are when Result is acting as a {@link CellScanner}. */ private int cellScannerIndex = INITIAL_CELLSCANNER_INDEX; + private ClientProtos.RegionLoadStats loadStats; /** * Creates an empty Result w/ no KeyValue payload; returns null if you call {@link #rawCells()}. @@ -839,4 +841,19 @@ public class Result implements CellScannable, CellScanner { this.exists = exists; } + /** + * Add load information about the region to the information about the result + * @param loadStats statistics about the current region from which this was returned + */ + public void addResults(ClientProtos.RegionLoadStats loadStats) { + this.loadStats = loadStats; + } + + /** + * @return the associated statistics about the region from which this was returned. Can be + * null if stats are disabled. + */ + public ClientProtos.RegionLoadStats getStats() { + return loadStats; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java new file mode 100644 index 0000000..3ecafe1 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultStatsUtil.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; + +/** + * A {@link Result} with some statistics about the server/region status + */ +@InterfaceAudience.Private +public final class ResultStatsUtil { + + private ResultStatsUtil() { + //private ctor for util class + } + + /** + * Update the stats for the specified region if the result is an instance of {@link + * ResultStatsUtil} + * + * @param r object that contains the result and possibly the statistics about the region + * @param serverStats stats tracker to update from the result + * @param server server from which the result was obtained + * @param regionName full region name for the stats. + * @return the underlying {@link Result} if the passed result is an {@link + * ResultStatsUtil} or just returns the result; + */ + public static T updateStats(T r, ServerStatisticTracker serverStats, + ServerName server, byte[] regionName) { + if (!(r instanceof Result)) { + return r; + } + Result result = (Result) r; + // early exit if there are no stats to collect + ClientProtos.RegionLoadStats stats = result.getStats(); + if(stats == null){ + return r; + } + + if (regionName != null) { + serverStats.updateRegionStats(server, regionName, stats); + } + + return r; + } + + public static T updateStats(T r, ServerStatisticTracker stats, + HRegionLocation regionLocation) { + byte[] regionName = null; + ServerName server = null; + if (regionLocation != null) { + server = regionLocation.getServerName(); + regionName = regionLocation.getRegionInfo().getRegionName(); + } + + return updateStats(r, stats, server, regionName); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index 9552992..95fbb92 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -39,9 +39,7 @@ import org.apache.hadoop.ipc.RemoteException; import com.google.protobuf.ServiceException; /** - * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client - * threadlocal outstanding timeouts as so we don't persist too much. - * Dynamic rather than static so can set the generic appropriately. + * Dynamic rather than static so can set the generic return type appropriately. */ @InterfaceAudience.Private @edu.umd.cs.findbugs.annotations.SuppressWarnings diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 53d5c58..8d8c72c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ReflectionUtils; /** @@ -32,31 +33,61 @@ public class RpcRetryingCallerFactory { private final long pause; private final int retries; private final int startLogErrorsCnt; + private final boolean enableBackPressure; + private ServerStatisticTracker stats; public RpcRetryingCallerFactory(Configuration conf) { + this(conf, null); + } + + public RpcRetryingCallerFactory(Configuration conf, ServerStatisticTracker stats) { this.conf = conf; + this.stats = stats; pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); + enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, + HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + } + + /** + * Set the tracker that should be used for tracking statistics about the server + */ + public void setStatisticTracker(ServerStatisticTracker statisticTracker) { + this.stats = statisticTracker; } public RpcRetryingCaller newCaller() { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. - return new RpcRetryingCaller(pause, retries, startLogErrorsCnt); + RpcRetryingCaller caller; + if (enableBackPressure && this.stats != null) { + caller = new StatsTrackingRpcRetryingCaller(pause, retries, startLogErrorsCnt, + this.stats); + } else { + caller = new RpcRetryingCaller(pause, retries, startLogErrorsCnt); + } + return caller; } - public static RpcRetryingCallerFactory instantiate(Configuration configuration) { + public static RpcRetryingCallerFactory instantiate(Configuration configuration, + ServerStatisticTracker stats) { String clazzName = RpcRetryingCallerFactory.class.getName(); String rpcCallerFactoryClazz = configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); if (rpcCallerFactoryClazz.equals(clazzName)) { - return new RpcRetryingCallerFactory(configuration); + return new RpcRetryingCallerFactory(configuration, stats); + } + try { + return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, + new Class[] { Configuration.class, ServerStatisticTracker.class }, + new Object[] { configuration, stats }); + } catch (UnsupportedOperationException e) { + return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, + new Class[] { Configuration.class }, new Object[] { configuration }); } - return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, - new Class[] { Configuration.class }, new Object[] { configuration }); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java new file mode 100644 index 0000000..0c7b683 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Tracks the statistics for multiple regions + */ +@InterfaceAudience.Private +public class ServerStatisticTracker { + + private final Map stats = + new ConcurrentHashMap(); + + public void updateRegionStats(ServerName server, byte[] region, ClientProtos.RegionLoadStats + currentStats) { + ServerStatistics stat = stats.get(server); + + if (stat == null) { + // create a stats object and update the stats + synchronized (this) { + stat = stats.get(server); + // we don't have stats for that server yet, so we need to make some + if (stat == null) { + stat = new ServerStatistics(); + stats.put(server, stat); + } + } + } + stat.update(region, currentStats); + } + + public ServerStatistics getStats(ServerName server) { + return this.stats.get(server); + } + + public static ServerStatisticTracker create(Configuration conf) { + if (!conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, + HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE)) { + return null; + } + return new ServerStatisticTracker(); + } + + @VisibleForTesting + ServerStatistics getServerStatsForTesting(ServerName server) { + return stats.get(server); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticsHConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticsHConnection.java new file mode 100644 index 0000000..79bf9f7 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatisticsHConnection.java @@ -0,0 +1,39 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; + +/** + * A server statistics tracking aware HConnection. + */ +@InterfaceAudience.Private +public interface StatisticsHConnection extends HConnection { + /** + * @return the current statistics tracker associated with this connection + */ + ServerStatisticTracker getStatisticsTracker(); + + /** + * @return the configured client backoff policy + */ + ClientBackoffPolicy getBackoffPolicy(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java new file mode 100644 index 0000000..bf80a62 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/StatsTrackingRpcRetryingCaller.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import java.io.IOException; + +/** + * An {@link RpcRetryingCaller} that will update the per-region stats for the call on return, + * if stats are available + */ +@InterfaceAudience.Private +public class StatsTrackingRpcRetryingCaller extends RpcRetryingCaller { + private final ServerStatisticTracker stats; + + public StatsTrackingRpcRetryingCaller(long pause, int retries, int startLogErrorsCnt, + ServerStatisticTracker stats) { + super(pause, retries, startLogErrorsCnt); + this.stats = stats; + } + + @Override + public T callWithRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException { + T result = super.callWithRetries(callable, callTimeout); + return updateStatsAndUnwrap(result, callable); + } + + @Override + public T callWithoutRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException { + T result = super.callWithRetries(callable, callTimeout); + return updateStatsAndUnwrap(result, callable); + } + + private T updateStatsAndUnwrap(T result, RetryingCallable callable) { + // don't track stats about requests that aren't to regionservers + if (!(callable instanceof RegionServerCallable)) { + return result; + } + + // mutli-server callables span multiple regions, so they don't have a location, + // but they are region server callables, so we have to handle them when we process the + // result, not in here + if (callable instanceof MultiServerCallable) { + return result; + } + + // update the stats for the single server callable + RegionServerCallable regionCallable = (RegionServerCallable) callable; + HRegionLocation location = regionCallable.getLocation(); + return ResultStatsUtil.updateStats(result, stats, location); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java new file mode 100644 index 0000000..94e434f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicy.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Configurable policy for the amount of time a client should wait for a new request to the + * server when given the server load statistics. + *

+ * Must have a single-argument constructor that takes a {@link org.apache.hadoop.conf.Configuration} + *

+ */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface ClientBackoffPolicy { + + public static final String BACKOFF_POLICY_CLASS = + "hbase.client.statistics.backoff-policy"; + + /** + * @return the number of ms to wait on the client based on the + */ + public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats); +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java new file mode 100644 index 0000000..879a0e2 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ClientBackoffPolicyFactory.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class ClientBackoffPolicyFactory { + + private static final Log LOG = LogFactory.getLog(ClientBackoffPolicyFactory.class); + + private ClientBackoffPolicyFactory() { + } + + public static ClientBackoffPolicy create(Configuration conf) { + // create the backoff policy + String className = + conf.get(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, NoBackoffPolicy.class + .getName()); + return ReflectionUtils.instantiateWithCustomCtor(className, + new Class[] { Configuration.class }, new Object[] { conf }); + } + + /** + * Default backoff policy that doesn't create any backoff for the client, regardless of load + */ + public static class NoBackoffPolicy implements ClientBackoffPolicy { + public NoBackoffPolicy(Configuration conf){ + // necessary to meet contract + } + + @Override + public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) { + return 0; + } + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java new file mode 100644 index 0000000..6e75670 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ExponentialClientBackoffPolicy.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Simple exponential backoff policy on for the client that uses a percent^4 times the + * max backoff to generate the backoff time. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class ExponentialClientBackoffPolicy implements ClientBackoffPolicy { + + private static final Log LOG = LogFactory.getLog(ExponentialClientBackoffPolicy.class); + + private static final long ONE_MINUTE = 60 * 1000; + public static final long DEFAULT_MAX_BACKOFF = 5 * ONE_MINUTE; + public static final String MAX_BACKOFF_KEY = "hbase.client.exponential-backoff.max"; + private long maxBackoff; + + public ExponentialClientBackoffPolicy(Configuration conf) { + this.maxBackoff = conf.getLong(MAX_BACKOFF_KEY, DEFAULT_MAX_BACKOFF); + } + + @Override + public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) { + // no stats for the server yet, so don't backoff + if (stats == null) { + return 0; + } + + ServerStatistics.RegionStatistics regionStats = stats.getStatsForRegion(region); + // no stats for the region yet - don't backoff + if (regionStats == null) { + return 0; + } + + // square the percent as a value less than 1. Closer we move to 100 percent, + // the percent moves to 1, but squaring causes the exponential curve + double percent = regionStats.getMemstoreLoadPercent() / 100.0; + double multiplier = Math.pow(percent, 4.0); + // shouldn't ever happen, but just incase something changes in the statistic data + if (multiplier > 1) { + LOG.warn("Somehow got a backoff multiplier greater than the allowed backoff. Forcing back " + + "down to the max backoff"); + multiplier = 1; + } + return (long) (multiplier * maxBackoff); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java new file mode 100644 index 0000000..a3b8e11 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/ServerStatistics.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.backoff; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.Map; +import java.util.TreeMap; + +/** + * Track the statistics for a single region + */ +@InterfaceAudience.Private +public class ServerStatistics { + + private Map + stats = new TreeMap(Bytes.BYTES_COMPARATOR); + + /** + * Good enough attempt. Last writer wins. It doesn't really matter which one gets to update, + * as something gets set + * @param region + * @param currentStats + */ + public void update(byte[] region, ClientProtos.RegionLoadStats currentStats) { + RegionStatistics regionStat = this.stats.get(region); + if(regionStat == null){ + regionStat = new RegionStatistics(); + this.stats.put(region, regionStat); + } + + regionStat.update(currentStats); + } + + @InterfaceAudience.Private + public RegionStatistics getStatsForRegion(byte[] regionName){ + return stats.get(regionName); + } + + public static class RegionStatistics{ + private int memstoreLoad = 0; + + public void update(ClientProtos.RegionLoadStats currentStats) { + this.memstoreLoad = currentStats.getMemstoreLoad(); + } + + public int getMemstoreLoadPercent(){ + return this.memstoreLoad; + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java index 367d5f9..887c4c0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java @@ -116,19 +116,23 @@ public final class ResponseConverter { } for (ResultOrException roe : actionResult.getResultOrExceptionList()) { + Object responseValue; if (roe.hasException()) { - results.add(regionName, new Pair(roe.getIndex(), - ProtobufUtil.toException(roe.getException()))); + responseValue = ProtobufUtil.toException(roe.getException()); } else if (roe.hasResult()) { - results.add(regionName, new Pair(roe.getIndex(), - ProtobufUtil.toResult(roe.getResult(), cells))); + responseValue = ProtobufUtil.toResult(roe.getResult(), cells); + // add the load stats, if we got any + if (roe.hasLoadStats()) { + ((Result) responseValue).addResults(roe.getLoadStats()); + } } else if (roe.hasServiceResult()) { - results.add(regionName, roe.getIndex(), roe.getServiceResult()); + responseValue = roe.getServiceResult(); } else { // no result & no exception. Unexpected. throw new IllegalStateException("No result & no exception roe=" + roe + " for region " + actions.getRegion()); } + results.add(regionName, roe.getIndex(), responseValue); } } @@ -151,11 +155,14 @@ public final class ResponseConverter { * Wrap a throwable to an action result. * * @param r + * @param stats * @return an action result builder */ - public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r) { + public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r, + ClientProtos.RegionLoadStats stats) { ResultOrException.Builder builder = ResultOrException.newBuilder(); if (r != null) builder.setResult(r); + if (stats != null) builder.setLoadStats(stats); return builder; } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java new file mode 100644 index 0000000..24cb661 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientExponentialBackoff.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestClientExponentialBackoff { + + ServerName server = Mockito.mock(ServerName.class); + byte[] regionname = Bytes.toBytes("region"); + + @Test + public void testNulls() { + Configuration conf = new Configuration(false); + ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf); + assertEquals(0, backoff.getBackoffTime(null, null, null)); + + // server name doesn't matter to calculation, but check it now anyways + assertEquals(0, backoff.getBackoffTime(server, null, null)); + assertEquals(0, backoff.getBackoffTime(server, regionname, null)); + + // check when no stats for the region yet + ServerStatistics stats = new ServerStatistics(); + assertEquals(0, backoff.getBackoffTime(server, regionname, stats)); + } + + @Test + public void testMaxLoad() { + Configuration conf = new Configuration(false); + ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf); + + ServerStatistics stats = new ServerStatistics(); + update(stats, 100); + assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, backoff.getBackoffTime(server, + regionname, stats)); + + // another policy with a different max timeout + long max = 100; + conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, max); + ExponentialClientBackoffPolicy backoffShortTimeout = new ExponentialClientBackoffPolicy(conf); + assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, stats)); + + // test beyond 100 still doesn't exceed the max + update(stats, 101); + assertEquals(ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF, backoff.getBackoffTime(server, + regionname, stats)); + assertEquals(max, backoffShortTimeout.getBackoffTime(server, regionname, stats)); + + // and that when we are below 100, its less than the max timeout + update(stats, 99); + assertTrue(backoff.getBackoffTime(server, + regionname, stats) < ExponentialClientBackoffPolicy.DEFAULT_MAX_BACKOFF); + assertTrue(backoffShortTimeout.getBackoffTime(server, regionname, stats) < max); + } + + /** + * Make sure that we get results in the order that we expect - backoff for a load of 1 should + * less than backoff for 10, which should be less than that for 50. + */ + @Test + public void testResultOrdering() { + Configuration conf = new Configuration(false); + // make the max timeout really high so we get differentiation between load factors + conf.setLong(ExponentialClientBackoffPolicy.MAX_BACKOFF_KEY, Integer.MAX_VALUE); + ExponentialClientBackoffPolicy backoff = new ExponentialClientBackoffPolicy(conf); + + ServerStatistics stats = new ServerStatistics(); + long previous = backoff.getBackoffTime(server, regionname, stats); + for (int i = 1; i <= 100; i++) { + update(stats, i); + long next = backoff.getBackoffTime(server, regionname, stats); + assertTrue( + "Previous backoff time" + previous + " >= " + next + ", the next backoff time for " + + "load " + i, previous < next); + previous = next; + } + } + + private void update(ServerStatistics stats, int load) { + ClientProtos.RegionLoadStats stat = ClientProtos.RegionLoadStats.newBuilder() + .setMemstoreLoad + (load).build(); + stats.update(regionname, stat); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 323ec8b..f02f0ac 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1026,6 +1026,11 @@ public final class HConstants { /** Configuration key for setting replication codec class name */ public static final String REPLICATION_CODEC_CONF_KEY = "hbase.replication.rpc.codec"; + /** Config key for if the server should send backpressure and if the client should listen to + * that backpressure from the server */ + public static final String ENABLE_CLIENT_BACKPRESSURE = "hbase.client.backpressure.enabled"; + public static final boolean DEFAULT_ENABLE_CLIENT_BACKPRESSURE = false; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 5e23bb5..fc776ea 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -25713,6 +25713,482 @@ public final class ClientProtos { // @@protoc_insertion_point(class_scope:RegionAction) } + public interface RegionLoadStatsOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // optional int32 memstoreLoad = 1 [default = 0]; + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+     * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+     * 
+ */ + boolean hasMemstoreLoad(); + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+     * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+     * 
+ */ + int getMemstoreLoad(); + } + /** + * Protobuf type {@code RegionLoadStats} + * + *
+   *
+   * Statistics about the current load on the region
+   * 
+ */ + public static final class RegionLoadStats extends + com.google.protobuf.GeneratedMessage + implements RegionLoadStatsOrBuilder { + // Use RegionLoadStats.newBuilder() to construct. + private RegionLoadStats(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private RegionLoadStats(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final RegionLoadStats defaultInstance; + public static RegionLoadStats getDefaultInstance() { + return defaultInstance; + } + + public RegionLoadStats getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private RegionLoadStats( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + memstoreLoad_ = input.readInt32(); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.class, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public RegionLoadStats parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new RegionLoadStats(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // optional int32 memstoreLoad = 1 [default = 0]; + public static final int MEMSTORELOAD_FIELD_NUMBER = 1; + private int memstoreLoad_; + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+     * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+     * 
+ */ + public boolean hasMemstoreLoad() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+     * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+     * 
+ */ + public int getMemstoreLoad() { + return memstoreLoad_; + } + + private void initFields() { + memstoreLoad_ = 0; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeInt32(1, memstoreLoad_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(1, memstoreLoad_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats other = (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats) obj; + + boolean result = true; + result = result && (hasMemstoreLoad() == other.hasMemstoreLoad()); + if (hasMemstoreLoad()) { + result = result && (getMemstoreLoad() + == other.getMemstoreLoad()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + private int memoizedHashCode = 0; + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasMemstoreLoad()) { + hash = (37 * hash) + MEMSTORELOAD_FIELD_NUMBER; + hash = (53 * hash) + getMemstoreLoad(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code RegionLoadStats} + * + *
+     *
+     * Statistics about the current load on the region
+     * 
+ */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.class, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder.class); + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + memstoreLoad_ = 0; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.internal_static_RegionLoadStats_descriptor; + } + + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats build() { + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats result = new org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.memstoreLoad_ = memstoreLoad_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance()) return this; + if (other.hasMemstoreLoad()) { + setMemstoreLoad(other.getMemstoreLoad()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // optional int32 memstoreLoad = 1 [default = 0]; + private int memstoreLoad_ ; + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+       * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+       * 
+ */ + public boolean hasMemstoreLoad() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+       * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+       * 
+ */ + public int getMemstoreLoad() { + return memstoreLoad_; + } + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+       * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+       * 
+ */ + public Builder setMemstoreLoad(int value) { + bitField0_ |= 0x00000001; + memstoreLoad_ = value; + onChanged(); + return this; + } + /** + * optional int32 memstoreLoad = 1 [default = 0]; + * + *
+       * percent load on the memstore. Guaranteed to be positive, between 0 and 100
+       * 
+ */ + public Builder clearMemstoreLoad() { + bitField0_ = (bitField0_ & ~0x00000001); + memstoreLoad_ = 0; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:RegionLoadStats) + } + + static { + defaultInstance = new RegionLoadStats(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:RegionLoadStats) + } + public interface ResultOrExceptionOrBuilder extends com.google.protobuf.MessageOrBuilder { @@ -25789,6 +26265,32 @@ public final class ClientProtos { * */ org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResultOrBuilder getServiceResultOrBuilder(); + + // optional .RegionLoadStats loadStats = 5; + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+     * current load on the region
+     * 
+ */ + boolean hasLoadStats(); + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+     * current load on the region
+     * 
+ */ + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getLoadStats(); + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+     * current load on the region
+     * 
+ */ + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder getLoadStatsOrBuilder(); } /** * Protobuf type {@code ResultOrException} @@ -25892,6 +26394,19 @@ public final class ClientProtos { bitField0_ |= 0x00000008; break; } + case 42: { + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder subBuilder = null; + if (((bitField0_ & 0x00000010) == 0x00000010)) { + subBuilder = loadStats_.toBuilder(); + } + loadStats_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(loadStats_); + loadStats_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000010; + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -26036,11 +26551,46 @@ public final class ClientProtos { return serviceResult_; } + // optional .RegionLoadStats loadStats = 5; + public static final int LOADSTATS_FIELD_NUMBER = 5; + private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats loadStats_; + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+     * current load on the region
+     * 
+ */ + public boolean hasLoadStats() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+     * current load on the region
+     * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getLoadStats() { + return loadStats_; + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+     * current load on the region
+     * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder getLoadStatsOrBuilder() { + return loadStats_; + } + private void initFields() { index_ = 0; result_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Result.getDefaultInstance(); exception_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair.getDefaultInstance(); serviceResult_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResult.getDefaultInstance(); + loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -26078,6 +26628,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeMessage(4, serviceResult_); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeMessage(5, loadStats_); + } getUnknownFields().writeTo(output); } @@ -26103,6 +26656,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(4, serviceResult_); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(5, loadStats_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -26146,6 +26703,11 @@ public final class ClientProtos { result = result && getServiceResult() .equals(other.getServiceResult()); } + result = result && (hasLoadStats() == other.hasLoadStats()); + if (hasLoadStats()) { + result = result && getLoadStats() + .equals(other.getLoadStats()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -26175,6 +26737,10 @@ public final class ClientProtos { hash = (37 * hash) + SERVICE_RESULT_FIELD_NUMBER; hash = (53 * hash) + getServiceResult().hashCode(); } + if (hasLoadStats()) { + hash = (37 * hash) + LOADSTATS_FIELD_NUMBER; + hash = (53 * hash) + getLoadStats().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -26286,6 +26852,7 @@ public final class ClientProtos { getResultFieldBuilder(); getExceptionFieldBuilder(); getServiceResultFieldBuilder(); + getLoadStatsFieldBuilder(); } } private static Builder create() { @@ -26314,6 +26881,12 @@ public final class ClientProtos { serviceResultBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000008); + if (loadStatsBuilder_ == null) { + loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance(); + } else { + loadStatsBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -26370,6 +26943,14 @@ public final class ClientProtos { } else { result.serviceResult_ = serviceResultBuilder_.build(); } + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + if (loadStatsBuilder_ == null) { + result.loadStats_ = loadStats_; + } else { + result.loadStats_ = loadStatsBuilder_.build(); + } result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -26398,6 +26979,9 @@ public final class ClientProtos { if (other.hasServiceResult()) { mergeServiceResult(other.getServiceResult()); } + if (other.hasLoadStats()) { + mergeLoadStats(other.getLoadStats()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -26877,6 +27461,159 @@ public final class ClientProtos { return serviceResultBuilder_; } + // optional .RegionLoadStats loadStats = 5; + private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder> loadStatsBuilder_; + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public boolean hasLoadStats() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats getLoadStats() { + if (loadStatsBuilder_ == null) { + return loadStats_; + } else { + return loadStatsBuilder_.getMessage(); + } + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public Builder setLoadStats(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats value) { + if (loadStatsBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + loadStats_ = value; + onChanged(); + } else { + loadStatsBuilder_.setMessage(value); + } + bitField0_ |= 0x00000010; + return this; + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public Builder setLoadStats( + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder builderForValue) { + if (loadStatsBuilder_ == null) { + loadStats_ = builderForValue.build(); + onChanged(); + } else { + loadStatsBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000010; + return this; + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public Builder mergeLoadStats(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats value) { + if (loadStatsBuilder_ == null) { + if (((bitField0_ & 0x00000010) == 0x00000010) && + loadStats_ != org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance()) { + loadStats_ = + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.newBuilder(loadStats_).mergeFrom(value).buildPartial(); + } else { + loadStats_ = value; + } + onChanged(); + } else { + loadStatsBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000010; + return this; + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public Builder clearLoadStats() { + if (loadStatsBuilder_ == null) { + loadStats_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.getDefaultInstance(); + onChanged(); + } else { + loadStatsBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000010); + return this; + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder getLoadStatsBuilder() { + bitField0_ |= 0x00000010; + onChanged(); + return getLoadStatsFieldBuilder().getBuilder(); + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder getLoadStatsOrBuilder() { + if (loadStatsBuilder_ != null) { + return loadStatsBuilder_.getMessageOrBuilder(); + } else { + return loadStats_; + } + } + /** + * optional .RegionLoadStats loadStats = 5; + * + *
+       * current load on the region
+       * 
+ */ + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder> + getLoadStatsFieldBuilder() { + if (loadStatsBuilder_ == null) { + loadStatsBuilder_ = new com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats.Builder, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStatsOrBuilder>( + loadStats_, + getParentForChildren(), + isClean()); + loadStats_ = null; + } + return loadStatsBuilder_; + } + // @@protoc_insertion_point(builder_scope:ResultOrException) } @@ -30570,6 +31307,11 @@ public final class ClientProtos { com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_RegionAction_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor + internal_static_RegionLoadStats_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_RegionLoadStats_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor internal_static_ResultOrException_descriptor; private static com.google.protobuf.GeneratedMessage.FieldAccessorTable @@ -30680,30 +31422,32 @@ public final class ClientProtos { "(\0132\004.Get\022-\n\014service_call\030\004 \001(\0132\027.Coproce", "ssorServiceCall\"Y\n\014RegionAction\022 \n\006regio" + "n\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006atomic\030\002 \001(" + - "\010\022\027\n\006action\030\003 \003(\0132\007.Action\"\221\001\n\021ResultOrE" + - "xception\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 \001(\0132" + - "\007.Result\022!\n\texception\030\003 \001(\0132\016.NameBytesP" + - "air\0221\n\016service_result\030\004 \001(\0132\031.Coprocesso" + - "rServiceResult\"f\n\022RegionActionResult\022-\n\021" + - "resultOrException\030\001 \003(\0132\022.ResultOrExcept" + - "ion\022!\n\texception\030\002 \001(\0132\016.NameBytesPair\"f" + - "\n\014MultiRequest\022#\n\014regionAction\030\001 \003(\0132\r.R", - "egionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcondi" + - "tion\030\003 \001(\0132\n.Condition\"S\n\rMultiResponse\022" + - "/\n\022regionActionResult\030\001 \003(\0132\023.RegionActi" + - "onResult\022\021\n\tprocessed\030\002 \001(\0102\205\003\n\rClientSe" + - "rvice\022 \n\003Get\022\013.GetRequest\032\014.GetResponse\022" + - ")\n\006Mutate\022\016.MutateRequest\032\017.MutateRespon" + - "se\022#\n\004Scan\022\014.ScanRequest\032\r.ScanResponse\022" + - ">\n\rBulkLoadHFile\022\025.BulkLoadHFileRequest\032" + - "\026.BulkLoadHFileResponse\022F\n\013ExecService\022\032" + - ".CoprocessorServiceRequest\032\033.Coprocessor", - "ServiceResponse\022R\n\027ExecRegionServerServi" + - "ce\022\032.CoprocessorServiceRequest\032\033.Coproce" + - "ssorServiceResponse\022&\n\005Multi\022\r.MultiRequ" + - "est\032\016.MultiResponseBB\n*org.apache.hadoop" + - ".hbase.protobuf.generatedB\014ClientProtosH" + - "\001\210\001\001\240\001\001" + "\010\022\027\n\006action\030\003 \003(\0132\007.Action\"*\n\017RegionLoad" + + "Stats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010\"\266\001\n\021Resul" + + "tOrException\022\r\n\005index\030\001 \001(\r\022\027\n\006result\030\002 " + + "\001(\0132\007.Result\022!\n\texception\030\003 \001(\0132\016.NameBy" + + "tesPair\0221\n\016service_result\030\004 \001(\0132\031.Coproc" + + "essorServiceResult\022#\n\tloadStats\030\005 \001(\0132\020." + + "RegionLoadStats\"f\n\022RegionActionResult\022-\n" + + "\021resultOrException\030\001 \003(\0132\022.ResultOrExcep", + "tion\022!\n\texception\030\002 \001(\0132\016.NameBytesPair\"" + + "f\n\014MultiRequest\022#\n\014regionAction\030\001 \003(\0132\r." + + "RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022\035\n\tcond" + + "ition\030\003 \001(\0132\n.Condition\"S\n\rMultiResponse" + + "\022/\n\022regionActionResult\030\001 \003(\0132\023.RegionAct" + + "ionResult\022\021\n\tprocessed\030\002 \001(\0102\205\003\n\rClientS" + + "ervice\022 \n\003Get\022\013.GetRequest\032\014.GetResponse" + + "\022)\n\006Mutate\022\016.MutateRequest\032\017.MutateRespo" + + "nse\022#\n\004Scan\022\014.ScanRequest\032\r.ScanResponse" + + "\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileRequest", + "\032\026.BulkLoadHFileResponse\022F\n\013ExecService\022" + + "\032.CoprocessorServiceRequest\032\033.Coprocesso" + + "rServiceResponse\022R\n\027ExecRegionServerServ" + + "ice\022\032.CoprocessorServiceRequest\032\033.Coproc" + + "essorServiceResponse\022&\n\005Multi\022\r.MultiReq" + + "uest\032\016.MultiResponseBB\n*org.apache.hadoo" + + "p.hbase.protobuf.generatedB\014ClientProtos" + + "H\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -30860,26 +31604,32 @@ public final class ClientProtos { com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RegionAction_descriptor, new java.lang.String[] { "Region", "Atomic", "Action", }); - internal_static_ResultOrException_descriptor = + internal_static_RegionLoadStats_descriptor = getDescriptor().getMessageTypes().get(22); + internal_static_RegionLoadStats_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_RegionLoadStats_descriptor, + new java.lang.String[] { "MemstoreLoad", }); + internal_static_ResultOrException_descriptor = + getDescriptor().getMessageTypes().get(23); internal_static_ResultOrException_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ResultOrException_descriptor, - new java.lang.String[] { "Index", "Result", "Exception", "ServiceResult", }); + new java.lang.String[] { "Index", "Result", "Exception", "ServiceResult", "LoadStats", }); internal_static_RegionActionResult_descriptor = - getDescriptor().getMessageTypes().get(23); + getDescriptor().getMessageTypes().get(24); internal_static_RegionActionResult_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RegionActionResult_descriptor, new java.lang.String[] { "ResultOrException", "Exception", }); internal_static_MultiRequest_descriptor = - getDescriptor().getMessageTypes().get(24); + getDescriptor().getMessageTypes().get(25); internal_static_MultiRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MultiRequest_descriptor, new java.lang.String[] { "RegionAction", "NonceGroup", "Condition", }); internal_static_MultiResponse_descriptor = - getDescriptor().getMessageTypes().get(25); + getDescriptor().getMessageTypes().get(26); internal_static_MultiResponse_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MultiResponse_descriptor, diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MasterProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MasterProtos.java index e52ad17..1fae9b9 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MasterProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MasterProtos.java @@ -44066,6 +44066,8 @@ public final class MasterProtos { } } + + // @@protoc_insertion_point(class_scope:MasterService) } private static com.google.protobuf.Descriptors.Descriptor diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/VisibilityLabelsProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/VisibilityLabelsProtos.java index 294772e..70593b0 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/VisibilityLabelsProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/VisibilityLabelsProtos.java @@ -5092,7 +5092,7 @@ public final class VisibilityLabelsProtos { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -5108,7 +5108,7 @@ public final class VisibilityLabelsProtos { getRegexBytes() { java.lang.Object ref = regex_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); regex_ = b; @@ -5414,7 +5414,7 @@ public final class VisibilityLabelsProtos { getRegexBytes() { java.lang.Object ref = regex_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); regex_ = b; diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 3b5627b..a648b1a 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -338,6 +338,14 @@ message RegionAction { repeated Action action = 3; } +/* +* Statistics about the current load on the region +*/ +message RegionLoadStats{ + // percent load on the memstore. Guaranteed to be positive, between 0 and 100 + optional int32 memstoreLoad = 1 [default = 0]; +} + /** * Either a Result or an Exception NameBytesPair (keyed by * exception name whose value is the exception stringified) @@ -351,6 +359,8 @@ message ResultOrException { optional NameBytesPair exception = 3; // result if this was a coprocessor service call optional CoprocessorServiceResult service_result = 4; + // current load on the region + optional RegionLoadStats loadStats = 5; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TestClientPushback.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TestClientPushback.java new file mode 100644 index 0000000..f206854 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TestClientPushback.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Test that we can actually send and use region metrics to slowdown client writes + */ +@Category(MediumTests.class) +public class TestClientPushback { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final byte[] tableName = Bytes.toBytes("client-pushback"); + private static final byte[] family = Bytes.toBytes("f"); + private static final byte[] qualifier = Bytes.toBytes("q"); + private static long flushSizeBytes = 1024; + + @BeforeClass + public static void setupCluster() throws Exception{ + Configuration conf = UTIL.getConfiguration(); + // enable backpressure + conf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true); + // turn the memstore size way down so we don't need to write a lot to see + // changes in memstore load + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes); + // ensure we block the flushes when we are double that flushsize + conf.setLong("hbase.hregion.memstore.block.multiplier", 2); + + UTIL.startMiniCluster(); + UTIL.createTable(tableName, family); + } + + @AfterClass + public static void teardownCluster() throws Exception{ + UTIL.shutdownMiniCluster(); + } + + @Test + public void testClientTrackesServerPushback() throws Exception{ + Configuration conf = UTIL.getConfiguration(); + // The cast here also asserts that HConnectionManager is creating + // connections of the correct type + StatisticsHConnection conn = (StatisticsHConnection) + HConnectionManager.createConnection(conf); + TableName tablename = TableName.valueOf(tableName); + HTableInterface table = conn.getTable(tablename); + try { + // write some data + Put p = new Put(Bytes.toBytes("row")); + p.add(family, qualifier, Bytes.toBytes("value1")); + table.put(p); + } finally { + table.close(); + } + ServerStatisticTracker stats = conn.getStatisticsTracker(); + assertNotNull( "No stats configured for the client!", stats); + // get the names so we can query the stats + ServerName server = UTIL.getHBaseCluster().getRegionServer(0).getServerName(); + byte[] regionName = UTIL.getHBaseCluster().getRegionServer(0).getOnlineRegions(tablename).get + (0).getRegionName(); + + // check to see we found some load on the memstore + ServerStatistics serverStats = stats.getServerStatsForTesting(server); + assertNotNull("No stats for server " + server, serverStats); + ServerStatistics.RegionStatistics regionStats = serverStats.getStatsForRegion(regionName); + assertEquals(15, regionStats.getMemstoreLoadPercent()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 1d8282d..8ef988e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -682,7 +682,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try { List toRetry = new ArrayList(); Configuration conf = getConf(); - boolean success = RpcRetryingCallerFactory.instantiate(conf). newCaller() + boolean success = RpcRetryingCallerFactory.instantiate(conf, null). newCaller() .callWithRetries(svrCallable); if (!success) { LOG.warn("Attempt to bulk load region containing " diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 75e26e1..ce23111 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.TableName; @@ -120,6 +121,7 @@ import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; @@ -517,6 +519,7 @@ public class HRegion implements HeapSize { // , Writable{ private final MetricsRegion metricsRegion; private final MetricsRegionWrapperImpl metricsRegionWrapper; private final Durability durability; + private final boolean regionStatsEnabled; /** * HRegion constructor. This constructor should only be used for testing and @@ -659,6 +662,12 @@ public class HRegion implements HeapSize { // , Writable{ this.disallowWritesInRecovering = conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG); + + // disable stats tracking system tables, but check the config for everything else + this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals( + NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) ? false : + conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, + HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); } void setHTableSpecificConf() { @@ -5010,18 +5019,18 @@ public class HRegion implements HeapSize { // , Writable{ return results; } - public void mutateRow(RowMutations rm) throws IOException { + public ClientProtos.RegionLoadStats mutateRow(RowMutations rm) throws IOException { // Don't need nonces here - RowMutations only supports puts and deletes - mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow())); + return mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow())); } /** * Perform atomic mutations within the region w/o nonces. * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)} */ - public void mutateRowsWithLocks(Collection mutations, + public ClientProtos.RegionLoadStats mutateRowsWithLocks(Collection mutations, Collection rowsToLock) throws IOException { - mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE); + return mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE); } /** @@ -5036,10 +5045,24 @@ public class HRegion implements HeapSize { // , Writable{ * rowsToLock is sorted in order to avoid deadlocks. * @throws IOException */ - public void mutateRowsWithLocks(Collection mutations, + public ClientProtos.RegionLoadStats mutateRowsWithLocks(Collection mutations, Collection rowsToLock, long nonceGroup, long nonce) throws IOException { MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock); processRowsWithLocks(proc, -1, nonceGroup, nonce); + return getRegionStats(); + } + + /** + * @return the current load statistics for the the region + */ + public ClientProtos.RegionLoadStats getRegionStats() { + if (!regionStatsEnabled) { + return null; + } + ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder(); + stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this + .memstoreFlushSize))); + return stats.build(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 80844a9..7139861 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3542,7 +3542,13 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa processed = checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family, qualifier, compareOp, comparator); } else { - mutateRows(region, regionAction.getActionList(), cellScanner); + ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(), + cellScanner); + // add the stats to the request + if (stats != null) { + responseBuilder.addRegionActionResult(RegionActionResult.newBuilder() + .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats))); + } processed = Boolean.TRUE; } } catch (IOException e) { @@ -4500,7 +4506,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa break; case SUCCESS: - builder.addResultOrException(getResultOrException(ClientProtos.Result.getDefaultInstance(), index)); + builder.addResultOrException(getResultOrException( + ClientProtos.Result.getDefaultInstance(), index, region.getRegionStats())); break; } } @@ -4517,10 +4524,12 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa metricsRegionServer.updateDelete(after - before); } } + private static ResultOrException getResultOrException(final ClientProtos.Result r, - final int index) { - return getResultOrException(ResponseConverter.buildActionResult(r), index); + final int index, final ClientProtos.RegionLoadStats stats) { + return getResultOrException(ResponseConverter.buildActionResult(r, stats), index); } + private static ResultOrException getResultOrException(final Exception e, final int index) { return getResultOrException(ResponseConverter.buildActionResult(e), index); } @@ -4589,9 +4598,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa * @param cellScanner if non-null, the mutation data -- the Cell content. * @throws IOException */ - protected void mutateRows(final HRegion region, final List actions, - final CellScanner cellScanner) - throws IOException { + protected ClientProtos.RegionLoadStats mutateRows(final HRegion region, + final List actions, final CellScanner cellScanner) + throws IOException { if (!region.getRegionInfo().isMetaTable()) { cacheFlusher.reclaimMemStoreMemory(); } @@ -4616,7 +4625,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); } } - region.mutateRow(rm); + return region.mutateRow(rm); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java index bc1e36c..8198e6c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; @@ -69,7 +68,6 @@ public class WALEditsReplaySink { private final AtomicLong totalReplayedEdits = new AtomicLong(); private final boolean skipErrors; private final int replayTimeout; - private RpcControllerFactory rpcControllerFactory; /** * Create a sink for WAL log entries replay @@ -88,7 +86,6 @@ public class WALEditsReplaySink { HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS); // a single replay operation time out and default is 60 seconds this.replayTimeout = conf.getInt("hbase.regionserver.logreplay.timeout", 60000); - this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); } /** @@ -161,7 +158,7 @@ public class WALEditsReplaySink { private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo, final List entries) throws IOException { try { - RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf); + RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null); ReplayServerCallable callable = new ReplayServerCallable(this.conn, this.tableName, regionLoc, regionInfo, entries); -- 1.7.12.4 (Apple Git-37)