From c2f6f479adc9fb108ba69e0407799dbdf5eaefa7 Mon Sep 17 00:00:00 2001 From: nkeywal Date: Fri, 7 Mar 2014 14:00:21 +0000 Subject: [PATCH 17/45] HBASE-10355 Failover RPC's from client using region replicas git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1575261 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/hbase/client/AsyncProcess.java | 34 +- .../hadoop/hbase/client/ClusterConnection.java | 12 +- .../hadoop/hbase/client/ConnectionAdapter.java | 6 + .../hadoop/hbase/client/ConnectionManager.java | 3 +- .../org/apache/hadoop/hbase/client/HTable.java | 58 ++- .../client/RpcRetryingCallerWithReadReplicas.java | 282 ++++++++++++ .../hadoop/hbase/client/TestAsyncProcess.java | 2 +- .../hbase/util/BoundedCompletionService.java | 81 ++++ .../hbase/client/CoprocessorHConnection.java | 6 + .../regionserver/StorefileRefresherChore.java | 2 +- .../hadoop/hbase/client/TestReplicasClient.java | 485 +++++++++++++++++++++ .../regionserver/TestRegionServerNoMaster.java | 7 + 12 files changed, 941 insertions(+), 37 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 9419932..252e808 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -92,7 +93,7 @@ class AsyncProcess { protected static final Log LOG = LogFactory.getLog(AsyncProcess.class); protected static final AtomicLong COUNTER = new AtomicLong(); - public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout"; + public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget"; /** * The context used to wait for results from one submit call. @@ -183,7 +184,7 @@ class AsyncProcess { protected int numTries; protected int serverTrackerTimeout; protected int timeout; - protected long primaryCallTimeout; + protected long primaryCallTimeoutMicroseconds; // End configuration settings. protected static class BatchErrors { @@ -242,7 +243,7 @@ class AsyncProcess { HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - this.primaryCallTimeout = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10); + this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); @@ -571,9 +572,9 @@ class AsyncProcess { @Override public void run() { boolean done = false; - if (primaryCallTimeout > 0) { + if (primaryCallTimeoutMicroseconds > 0) { try { - done = waitUntilDone(startTime + primaryCallTimeout); + done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds); } catch (InterruptedException ex) { LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage()); return; @@ -879,7 +880,7 @@ class AsyncProcess { long startTime = EnvironmentEdgeManager.currentTimeMillis(); ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable( actionsForReplicaThread, startTime); - if (primaryCallTimeout == 0) { + if (primaryCallTimeoutMicroseconds == 0) { // Start replica calls immediately. replicaRunnable.run(); } else { @@ -1287,23 +1288,26 @@ class AsyncProcess { private boolean waitUntilDone(long cutoff) throws InterruptedException { boolean hasWait = cutoff != Long.MAX_VALUE; - long lastLog = hasWait ? 0 : EnvironmentEdgeManager.currentTimeMillis(); + long lastLog = EnvironmentEdgeManager.currentTimeMillis(); long currentInProgress; while (0 != (currentInProgress = actionsInProgress.get())) { - long now = 0; - if (hasWait && (now = EnvironmentEdgeManager.currentTimeMillis()) > cutoff) { + long now = EnvironmentEdgeManager.currentTimeMillis(); + if (hasWait && (now * 1000L) > cutoff) { return false; } - if (!hasWait) { - // Only log if wait is infinite. - now = EnvironmentEdgeManager.currentTimeMillis(); + if (!hasWait) { // Only log if wait is infinite. if (now > lastLog + 10000) { lastLog = now; LOG.info("#" + id + ", waiting for " + currentInProgress + " actions to finish"); } - synchronized (actionsInProgress) { - if (actionsInProgress.get() == 0) break; - actionsInProgress.wait(Math.min(100, hasWait ? (cutoff - now) : Long.MAX_VALUE)); + } + synchronized (actionsInProgress) { + if (actionsInProgress.get() == 0) break; + if (!hasWait) { + actionsInProgress.wait(100); + } else { + long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L)); + TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond); } } } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java index 779d15d..fb63473 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java @@ -154,7 +154,17 @@ interface ClusterConnection extends HConnection { final boolean useCache, final boolean offlined) throws IOException; - + /** + * + * @param tableName table to get regions of + * @param row the row + * @param useCache Should we use the cache to retrieve the region information. + * @param retry do we retry + * @return region locations for this row. + * @throws IOException + */ + RegionLocations locateRegion(TableName tableName, + byte[] row, boolean useCache, boolean retry) throws IOException; /** * Returns a {@link MasterKeepAliveConnection} to the active master */ diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java index 3038fb2..bea5fa8 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java @@ -204,6 +204,12 @@ class ConnectionAdapter implements ClusterConnection { } @Override + public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache, + boolean retry) throws IOException { + return wrappedConnection.locateRegion(tableName, row, useCache, retry); + } + + @Override public RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException { return wrappedConnection.locateRegionAll(tableName, row); } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 9a4c8e6..98f9d65 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -986,7 +986,8 @@ class ConnectionManager { } - private RegionLocations locateRegion(final TableName tableName, + @Override + public RegionLocations locateRegion(final TableName tableName, final byte [] row, boolean useCache, boolean retry) throws IOException { if (this.closed) throw new IOException(toString() + " closed"); diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 5cd0102..fdbd1d4 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -138,8 +138,12 @@ public class HTable implements HTableInterface { private ExecutorService pool; // For Multi private boolean closed; private int operationTimeout; + private int retries; private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupConnectionOnClose; // close the connection in close() + private Consistency defaultConsistency = Consistency.STRONG; + private int primaryCallTimeoutMicroSecond; + /** The Async process for puts with autoflush set to false or multiputs */ protected AsyncProcess ap; @@ -361,6 +365,10 @@ public class HTable implements HTableInterface { this.scannerCaching = this.configuration.getInt( HConstants.HBASE_CLIENT_SCANNER_CACHING, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); + this.primaryCallTimeoutMicroSecond = + this.configuration.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10 ms + this.retries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration); this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); @@ -817,27 +825,41 @@ public class HTable implements HTableInterface { */ @Override public Result get(final Get get) throws IOException { - RegionServerCallable callable = new RegionServerCallable(this.connection, - getName(), get.getRow()) { - @Override - public Result call(int callTimeout) throws IOException { - ClientProtos.GetRequest request = - RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - ClientProtos.GetResponse response = getStub().get(controller, request); - if (response == null) return null; - return ProtobufUtil.toResult(response.getResult()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + if (get.getConsistency() == null){ + get.setConsistency(defaultConsistency); + } + + if (get.getConsistency() == Consistency.STRONG) { + // Good old call. + RegionServerCallable callable = new RegionServerCallable(this.connection, + getName(), get.getRow()) { + @Override + public Result call(int callTimeout) throws IOException { + ClientProtos.GetRequest request = + RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + try { + ClientProtos.GetResponse response = getStub().get(controller, request); + if (response == null) return null; + return ProtobufUtil.toResult(response.getResult()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } - } - }; - return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); + }; + return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); + } + + // Call that takes into account the replica + RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas( + rpcControllerFactory, tableName, this.connection, get, pool, retries, + operationTimeout, primaryCallTimeoutMicroSecond); + return callable.call(); } + /** * {@inheritDoc} */ diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java new file mode 100644 index 0000000..6afbe01 --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -0,0 +1,282 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hbase.client; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.BoundedCompletionService; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import com.google.protobuf.ServiceException; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Caller that goes to replica if the primary region does no answer within a configurable + * timeout. If the timeout is reached, it calls all the secondary replicas, and returns + * the first answer. If the answer comes from one of the secondary replica, it will + * be marked as stale. + */ +public class RpcRetryingCallerWithReadReplicas { + static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class); + protected final ExecutorService pool; + protected final ClusterConnection cConnection; + protected final Configuration conf; + protected final Get get; + protected final TableName tableName; + protected final int timeBeforeReplicas; + private final int callTimeout; + private final int retries; + private final RpcControllerFactory rpcControllerFactory; + + public RpcRetryingCallerWithReadReplicas( + RpcControllerFactory rpcControllerFactory, TableName tableName, + ClusterConnection cConnection, final Get get, + ExecutorService pool, int retries, int callTimeout, + int timeBeforeReplicas) { + this.rpcControllerFactory = rpcControllerFactory; + this.tableName = tableName; + this.cConnection = cConnection; + this.conf = cConnection.getConfiguration(); + this.get = get; + this.pool = pool; + this.retries = retries; + this.callTimeout = callTimeout; + this.timeBeforeReplicas = timeBeforeReplicas; + } + + /** + * A RegionServerCallable that takes into account the replicas, i.e. + * - the call can be on any replica + * - we need to stop retrying when the call is completed + * - we can be interrupted + */ + class ReplicaRegionServerCallable extends RegionServerCallable { + final int id; + + public ReplicaRegionServerCallable(int id, HRegionLocation location) { + super(RpcRetryingCallerWithReadReplicas.this.cConnection, + RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow()); + this.id = id; + this.location = location; + } + + /** + * Two responsibilities + * - if the call is already completed (by another replica) stops the retries. + * - set the location to the right region, depending on the replica. + */ + @Override + public void prepare(final boolean reload) throws IOException { + if (Thread.interrupted()) { + throw new InterruptedIOException(); + } + + if (reload || location == null) { + RegionLocations rl = getRegionLocations(false); + location = id < rl.size() ? rl.getRegionLocation(id) : null; + } + + if (location == null) { + // With this exception, there will be a retry. The location can be null for a replica + // when the table is created or after a split. + throw new HBaseIOException("There is no location for replica id #" + id); + } + + ServerName dest = location.getServerName(); + assert dest != null; + + setStub(cConnection.getClient(dest)); + } + + @Override + public Result call(int callTimeout) throws Exception { + if (Thread.interrupted()) { + throw new InterruptedIOException(); + } + + byte[] reg = location.getRegionInfo().getRegionName(); + + ClientProtos.GetRequest request = + RequestConverter.buildGetRequest(reg, get); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + try { + ClientProtos.GetResponse response = getStub().get(controller, request); + if (response == null) return null; + return ProtobufUtil.toResult(response.getResult()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + } + + /** + * Adapter to put the HBase retrying caller into a Java callable. + */ + class RetryingRPC implements Callable { + final RetryingCallable callable; + + RetryingRPC(RetryingCallable callable) { + this.callable = callable; + } + + @Override + public Result call() throws IOException { + return new RpcRetryingCallerFactory(conf).newCaller(). + callWithRetries(callable, callTimeout); + } + } + + /** + * Algo: + * - we put the query into the execution pool. + * - after x ms, if we don't have a result, we add the queries for the secondary replicas + * - we take the first answer + * - when done, we cancel what's left. Cancelling means: + * - removing from the pool if the actual call was not started + * - interrupting the call if it has started + * Client side, we need to take into account + * - a call is not executed immediately after being put into the pool + * - a call is a thread. Let's not multiply the number of thread by the number of replicas. + * Server side, if we can cancel when it's still in the handler pool, it's much better, as a call + * can take some i/o. + *

+ * Globally, the number of retries, timeout and so on still applies, but it's per replica, + * not global. We continue until all retries are done, or all timeouts are exceeded. + */ + public synchronized Result call() + throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { + RegionLocations rl = getRegionLocations(true); + BoundedCompletionService cs = new BoundedCompletionService(pool, rl.size()); + + addCallsForReplica(cs, rl, 0, 0); // primary. + + try { + Future f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds + if (f == null) { + addCallsForReplica(cs, rl, 1, rl.size() - 1); // secondaries + f = cs.take(); + } + return f.get(); + } catch (ExecutionException e) { + throwEnrichedException(e); + return null; // unreachable + } catch (CancellationException e) { + throw new InterruptedIOException(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } finally { + // We get there because we were interrupted or because one or more of the + // calls succeeded or failed. In all case, we stop all our tasks. + cs.cancelAll(true); + } + } + + /** + * Extract the real exception from the ExecutionException, and throws what makes more + * sense. + */ + private void throwEnrichedException(ExecutionException e) + throws RetriesExhaustedException, DoNotRetryIOException { + Throwable t = e.getCause(); + assert t != null; // That's what ExecutionException is about: holding an exception + + if (t instanceof RetriesExhaustedException) { + throw (RetriesExhaustedException) t; + } + + if (t instanceof DoNotRetryIOException) { + throw (DoNotRetryIOException) t; + } + + RetriesExhaustedException.ThrowableWithExtraContext qt = + new RetriesExhaustedException.ThrowableWithExtraContext(t, + EnvironmentEdgeManager.currentTimeMillis(), toString()); + + List exceptions = + Collections.singletonList(qt); + + throw new RetriesExhaustedException(retries, exceptions); + } + + /** + * Creates the calls and submit them + * + * @param cs - the completion service to use for submitting + * @param rl - the region locations + * @param min - the id of the first replica, inclusive + * @param max - the id of the last replica, inclusive. + */ + private void addCallsForReplica(BoundedCompletionService cs, + RegionLocations rl, int min, int max) { + for (int id = min; id <= max; id++) { + HRegionLocation hrl = rl.getRegionLocation(id); + ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl); + RetryingRPC retryingOnReplica = new RetryingRPC(callOnReplica); + cs.submit(retryingOnReplica); + } + } + + private RegionLocations getRegionLocations(boolean useCache) + throws RetriesExhaustedException, DoNotRetryIOException { + RegionLocations rl; + try { + rl = cConnection.locateRegion(tableName, get.getRow(), useCache, true); + } catch (IOException e) { + if (e instanceof DoNotRetryIOException) { + throw (DoNotRetryIOException) e; + } else if (e instanceof RetriesExhaustedException) { + throw (RetriesExhaustedException) e; + } else { + throw new RetriesExhaustedException("Can't get the location", e); + } + } + if (rl == null) { + throw new RetriesExhaustedException("Can't get the locations"); + } + + return rl; + } +} \ No newline at end of file diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 575827f..cc69b4f 100644 --- hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -968,7 +968,7 @@ public class TestAsyncProcess { // that the replica call has happened and that way control the ordering. Configuration conf = new Configuration(); ClusterConnection conn = createHConnectionWithReplicas(); - conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs); + conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000); if (retries > 0) { conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); } diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java new file mode 100644 index 0000000..514505b --- /dev/null +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java @@ -0,0 +1,81 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; + +/** + * A completion service, close to the one available in the JDK 1.7 + * However, this ones keeps the list of the future, and allows to cancel them all. + * This means as well that it can be used for a small set of tasks only. + */ +public class BoundedCompletionService { + private final Executor executor; + private final List> sent; // alls the call we sent + private final BlockingQueue> completed; // all the results we got so far. + + class QueueingFuture extends FutureTask { + + public QueueingFuture(Callable callable) { + super(callable); + } + + protected void done() { + completed.add(QueueingFuture.this); + } + } + + public BoundedCompletionService(Executor executor, int maxTasks) { + this.executor = executor; + this.sent = new ArrayList>(maxTasks); + this.completed = new ArrayBlockingQueue>(maxTasks); + } + + + public Future submit(Callable task) { + QueueingFuture newFuture = new QueueingFuture(task); + executor.execute(newFuture); + sent.add(newFuture); + return newFuture; + } + + public Future take() throws InterruptedException{ + return completed.take(); + } + + public Future poll(long timeout, TimeUnit unit) throws InterruptedException{ + return completed.poll(timeout, unit); + } + + public void cancelAll(boolean interrupt) { + for (Future future : sent) { + future.cancel(interrupt); + } + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java index e2be188..646c86d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java @@ -288,6 +288,12 @@ class CoprocessorHConnection implements ClusterConnection { } @Override + public RegionLocations locateRegion(TableName tableName, byte[] row, + boolean useCache, boolean retry) throws IOException { + return delegate.locateRegion(tableName, row, useCache, retry); + } + + @Override public List locateRegions(byte[] tableName, boolean useCache, boolean offlined) throws IOException { return delegate.locateRegions(tableName, useCache, offlined); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java index ab8af7d..1665713 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java @@ -47,7 +47,7 @@ public class StorefileRefresherChore extends Chore { /** * The period (in milliseconds) for refreshing the store files for the secondary regions. */ - static final String REGIONSERVER_STOREFILE_REFRESH_PERIOD + public static final String REGIONSERVER_STOREFILE_REFRESH_PERIOD = "hbase.regionserver.storefile.refresh.period"; static final int DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD = 0; //disabled by default diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java new file mode 100644 index 0000000..84fa5da --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -0,0 +1,485 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; +import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole + * cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}. + */ +@Category(MediumTests.class) +public class TestReplicasClient { + private static final Log LOG = LogFactory.getLog(TestReplicasClient.class); + + private static final int NB_SERVERS = 1; + private static HTable table = null; + private static final byte[] row = TestReplicasClient.class.getName().getBytes(); + + private static HRegionInfo hriPrimary; + private static HRegionInfo hriSecondary; + + private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); + private static final byte[] f = HConstants.CATALOG_FAMILY; + + private final static int REFRESH_PERIOD = 1000; + + /** + * This copro is used to synchronize the tests. + */ + public static class SlowMeCopro extends BaseRegionObserver { + static final AtomicLong sleepTime = new AtomicLong(0); + static final AtomicReference cdl = + new AtomicReference(new CountDownLatch(0)); + + public SlowMeCopro() { + } + + @Override + public void preGetOp(final ObserverContext e, + final Get get, final List results) throws IOException { + + if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { + CountDownLatch latch = cdl.get(); + try { + if (sleepTime.get() > 0) { + LOG.info("Sleeping for " + sleepTime.get() + " ms"); + Thread.sleep(sleepTime.get()); + } else if (latch.getCount() > 0) { + LOG.info("Waiting for the counterCountDownLatch"); + latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. + if (latch.getCount() > 0) { + throw new RuntimeException("Can't wait more"); + } + } + } catch (InterruptedException e1) { + LOG.error(e1); + } + } else { + LOG.info("We're not the primary replicas."); + } + } + } + + @BeforeClass + public static void beforeClass() throws Exception { + // enable store file refreshing + HTU.getConfiguration().setInt( + StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD); + + HTU.startMiniCluster(NB_SERVERS); + + // Create table then get the single region for our new table. + HTableDescriptor hdt = HTU.createTableDescriptor(TestReplicasClient.class.getSimpleName()); + hdt.addCoprocessor(SlowMeCopro.class.getName()); + table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration()); + + hriPrimary = table.getRegionLocation(row, false).getRegionInfo(); + + // mock a secondary region info to open + hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(), + hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1); + + // No master + LOG.info("Master is going to be stopped"); + TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU); + Configuration c = new Configuration(HTU.getConfiguration()); + c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + HBaseAdmin ha = new HBaseAdmin(c); + for (boolean masterRuns = true; masterRuns; ) { + Thread.sleep(100); + try { + masterRuns = false; + masterRuns = ha.isMasterRunning(); + } catch (MasterNotRunningException ignored) { + } + } + LOG.info("Master has stopped"); + } + + @AfterClass + public static void afterClass() throws Exception { + if (table != null) table.close(); + HTU.shutdownMiniCluster(); + } + + @Before + public void before() throws IOException { + HTU.getHBaseAdmin().getConnection().clearRegionCache(); + } + + @After + public void after() throws IOException, KeeperException { + try { + closeRegion(hriSecondary); + } catch (Exception ignored) { + } + ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriPrimary); + ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriSecondary); + + HTU.getHBaseAdmin().getConnection().clearRegionCache(); + } + + private HRegionServer getRS() { + return HTU.getMiniHBaseCluster().getRegionServer(0); + } + + private void openRegion(HRegionInfo hri) throws Exception { + ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName()); + // first version is '0' + AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest( + getRS().getServerName(), hri, 0, null, null); + AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr); + Assert.assertEquals(responseOpen.getOpeningStateCount(), 1); + Assert.assertEquals(responseOpen.getOpeningState(0), + AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED); + checkRegionIsOpened(hri); + } + + private void closeRegion(HRegionInfo hri) throws Exception { + ZKAssign.createNodeClosing(HTU.getZooKeeperWatcher(), hri, getRS().getServerName()); + + AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest( + getRS().getServerName(), hri.getEncodedName(), true); + AdminProtos.CloseRegionResponse responseClose = getRS() + .getRSRpcServices().closeRegion(null, crr); + Assert.assertTrue(responseClose.getClosed()); + + checkRegionIsClosed(hri.getEncodedName()); + + ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null); + } + + private void checkRegionIsOpened(HRegionInfo hri) throws Exception { + + while (!getRS().getRegionsInTransitionInRS().isEmpty()) { + Thread.sleep(1); + } + + Assert.assertTrue( + ZKAssign.deleteOpenedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null)); + } + + private void checkRegionIsClosed(String encodedRegionName) throws Exception { + + while (!getRS().getRegionsInTransitionInRS().isEmpty()) { + Thread.sleep(1); + } + + try { + Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable()); + } catch (NotServingRegionException expected) { + // That's how it work: if the region is closed we have an exception. + } + + // We don't delete the znode here, because there is not always a znode. + } + + private void flushRegion(HRegionInfo regionInfo) throws IOException { + for (RegionServerThread rst : HTU.getMiniHBaseCluster().getRegionServerThreads()) { + HRegion region = rst.getRegionServer().getRegionByEncodedName(regionInfo.getEncodedName()); + if (region != null) { + region.flushcache(); + return; + } + } + throw new IOException("Region to flush cannot be found"); + } + + @Test + public void testUseRegionWithoutReplica() throws Exception { + byte[] b1 = "testUseRegionWithoutReplica".getBytes(); + openRegion(hriSecondary); + SlowMeCopro.cdl.set(new CountDownLatch(0)); + try { + Get g = new Get(b1); + Result r = table.get(g); + Assert.assertFalse(r.isStale()); + } finally { + closeRegion(hriSecondary); + } + } + + @Test + public void testLocations() throws Exception { + byte[] b1 = "testLocations".getBytes(); + openRegion(hriSecondary); + ClusterConnection hc = (ClusterConnection) HTU.getHBaseAdmin().getConnection(); + + try { + hc.clearRegionCache(); + RegionLocations rl = hc.locateRegion(table.getName(), b1, false, false); + Assert.assertEquals(2, rl.size()); + + rl = hc.locateRegion(table.getName(), b1, true, false); + Assert.assertEquals(2, rl.size()); + + hc.clearRegionCache(); + rl = hc.locateRegion(table.getName(), b1, true, false); + Assert.assertEquals(2, rl.size()); + + rl = hc.locateRegion(table.getName(), b1, false, false); + Assert.assertEquals(2, rl.size()); + } finally { + closeRegion(hriSecondary); + } + } + + @Test + public void testGetNoResultNoStaleRegionWithReplica() throws Exception { + byte[] b1 = "testGetNoResultNoStaleRegionWithReplica".getBytes(); + openRegion(hriSecondary); + + try { + // A get works and is not stale + Get g = new Get(b1); + Result r = table.get(g); + Assert.assertFalse(r.isStale()); + } finally { + closeRegion(hriSecondary); + } + } + + + @Test + public void testGetNoResultStaleRegionWithReplica() throws Exception { + byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes(); + openRegion(hriSecondary); + + SlowMeCopro.cdl.set(new CountDownLatch(1)); + try { + Get g = new Get(b1); + g.setConsistency(Consistency.TIMELINE); + Result r = table.get(g); + Assert.assertTrue(r.isStale()); + } finally { + SlowMeCopro.cdl.get().countDown(); + closeRegion(hriSecondary); + } + } + + @Test + public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception { + byte[] b1 = "testGetNoResultNotStaleSleepRegionWithReplica".getBytes(); + openRegion(hriSecondary); + + try { + // We sleep; but we won't go to the stale region as we don't get the stale by default. + SlowMeCopro.sleepTime.set(2000); + Get g = new Get(b1); + Result r = table.get(g); + Assert.assertFalse(r.isStale()); + + } finally { + SlowMeCopro.sleepTime.set(0); + closeRegion(hriSecondary); + } + } + + + @Test + public void testFlushTable() throws Exception { + openRegion(hriSecondary); + try { + flushRegion(hriPrimary); + flushRegion(hriSecondary); + + Put p = new Put(row); + p.add(f, row, row); + table.put(p); + + flushRegion(hriPrimary); + flushRegion(hriSecondary); + } finally { + Delete d = new Delete(row); + table.delete(d); + closeRegion(hriSecondary); + } + } + + @Test + public void testFlushPrimary() throws Exception { + openRegion(hriSecondary); + + try { + flushRegion(hriPrimary); + + Put p = new Put(row); + p.add(f, row, row); + table.put(p); + + flushRegion(hriPrimary); + } finally { + Delete d = new Delete(row); + table.delete(d); + closeRegion(hriSecondary); + } + } + + @Test + public void testFlushSecondary() throws Exception { + openRegion(hriSecondary); + try { + flushRegion(hriSecondary); + + Put p = new Put(row); + p.add(f, row, row); + table.put(p); + + flushRegion(hriSecondary); + } catch (TableNotFoundException expected) { + } finally { + Delete d = new Delete(row); + table.delete(d); + closeRegion(hriSecondary); + } + } + + @Test + public void testUseRegionWithReplica() throws Exception { + byte[] b1 = "testUseRegionWithReplica".getBytes(); + openRegion(hriSecondary); + + try { + // A simple put works, even if there here a second replica + Put p = new Put(b1); + p.add(f, b1, b1); + table.put(p); + LOG.info("Put done"); + + // A get works and is not stale + Get g = new Get(b1); + Result r = table.get(g); + Assert.assertFalse(r.isStale()); + Assert.assertFalse(r.getColumnCells(f, b1).isEmpty()); + LOG.info("get works and is not stale done"); + + // Even if it we have to wait a little on the main region + SlowMeCopro.sleepTime.set(2000); + g = new Get(b1); + r = table.get(g); + Assert.assertFalse(r.isStale()); + Assert.assertFalse(r.getColumnCells(f, b1).isEmpty()); + SlowMeCopro.sleepTime.set(0); + LOG.info("sleep and is not stale done"); + + // But if we ask for stale we will get it + SlowMeCopro.cdl.set(new CountDownLatch(1)); + g = new Get(b1); + g.setConsistency(Consistency.TIMELINE); + r = table.get(g); + Assert.assertTrue(r.isStale()); + Assert.assertTrue(r.getColumnCells(f, b1).isEmpty()); + SlowMeCopro.cdl.get().countDown(); + + LOG.info("stale done"); + + // exists works and is not stale + g = new Get(b1); + g.setCheckExistenceOnly(true); + r = table.get(g); + Assert.assertFalse(r.isStale()); + Assert.assertTrue(r.getExists()); + LOG.info("exists not stale done"); + + // exists works on stale but don't see the put + SlowMeCopro.cdl.set(new CountDownLatch(1)); + g = new Get(b1); + g.setCheckExistenceOnly(true); + g.setConsistency(Consistency.TIMELINE); + r = table.get(g); + Assert.assertTrue(r.isStale()); + Assert.assertFalse("The secondary has stale data", r.getExists()); + SlowMeCopro.cdl.get().countDown(); + LOG.info("exists stale before flush done"); + + flushRegion(hriPrimary); + flushRegion(hriSecondary); + LOG.info("flush done"); + Thread.sleep(1000 + REFRESH_PERIOD * 2); + + // get works and is not stale + SlowMeCopro.cdl.set(new CountDownLatch(1)); + g = new Get(b1); + g.setConsistency(Consistency.TIMELINE); + r = table.get(g); + Assert.assertTrue(r.isStale()); + Assert.assertFalse(r.isEmpty()); + SlowMeCopro.cdl.get().countDown(); + LOG.info("stale done"); + + // exists works on stale and we see the put after the flush + SlowMeCopro.cdl.set(new CountDownLatch(1)); + g = new Get(b1); + g.setCheckExistenceOnly(true); + g.setConsistency(Consistency.TIMELINE); + r = table.get(g); + Assert.assertTrue(r.isStale()); + Assert.assertTrue(r.getExists()); + SlowMeCopro.cdl.get().countDown(); + LOG.info("exists stale after flush done"); + + } finally { + SlowMeCopro.cdl.get().countDown(); + SlowMeCopro.sleepTime.set(0); + Delete d = new Delete(b1); + table.delete(d); + closeRegion(hriSecondary); + } + } +} \ No newline at end of file diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java index cc87eb3..65a143d 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -52,6 +53,7 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mortbay.log.Log; import com.google.protobuf.ServiceException; @@ -95,6 +97,11 @@ public class TestRegionServerNoMaster { // No master HTU.getHBaseCluster().getMaster().stopMaster(); + Log.info("Waiting until master thread exits"); + while (HTU.getHBaseCluster().getMasterThread() != null + && HTU.getHBaseCluster().getMasterThread().isAlive()) { + Threads.sleep(100); + } // Master is down, so is the meta. We need to assign it somewhere // so that regions can be assigned during the mocking phase. HRegionServer hrs = HTU.getHBaseCluster().getRegionServer(0); -- 2.0.0