From cfeb101049e8754eb97b9ca561734e296c0d8c0c Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 18 Oct 2016 10:35:18 +0800 Subject: [PATCH] HBASE-16836 Implement increment and append --- .../hadoop/hbase/client/AsyncConnectionImpl.java | 15 +++ .../org/apache/hadoop/hbase/client/AsyncTable.java | 91 +++++++++++++++- .../apache/hadoop/hbase/client/AsyncTableImpl.java | 87 +++++++++++---- .../hbase/client/ConnectionImplementation.java | 21 +--- .../hadoop/hbase/client/ConnectionUtils.java | 20 ++++ .../org/apache/hadoop/hbase/client/HTable.java | 27 ++--- .../apache/hadoop/hbase/client/NonceGenerator.java | 6 +- .../client/PerClientRandomNonceGenerator.java | 20 +++- .../hbase/client/CoprocessorHConnection.java | 7 +- .../apache/hadoop/hbase/client/TestAsyncTable.java | 92 +++++++++++++--- .../hbase/client/TestAsyncTableNoncedRetry.java | 121 +++++++++++++++++++++ .../hadoop/hbase/client/TestMultiParallel.java | 17 ++- .../hbase/master/TestDistributedLogSplitting.java | 11 +- 13 files changed, 440 insertions(+), 95 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index c50e244..7a8fd9a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.CLUSTER_ID_DEFAULT; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; +import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; +import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; import io.netty.util.HashedWheelTimer; @@ -80,8 +82,11 @@ class AsyncConnectionImpl implements AsyncConnection { final AsyncRpcRetryingCallerFactory callerFactory; + private final NonceGenerator nonceGenerator; + private final ConcurrentMap rsStubs = new ConcurrentHashMap<>(); + @SuppressWarnings("deprecation") public AsyncConnectionImpl(Configuration conf, User user) throws IOException { this.conf = conf; this.user = user; @@ -103,6 +108,11 @@ class AsyncConnectionImpl implements AsyncConnection { this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); this.rpcTimeout = conf.getInt(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT); this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER); + if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { + nonceGenerator = PerClientRandomNonceGenerator.get(); + } else { + nonceGenerator = NO_NONCE_GENERATOR; + } } @Override @@ -127,6 +137,11 @@ class AsyncConnectionImpl implements AsyncConnection { return locator; } + // ditto + public NonceGenerator getNonceGenerator() { + return nonceGenerator; + } + private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException { return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index c4e7cec..2ed3c26 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import com.google.common.base.Preconditions; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -24,6 +26,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ReflectionUtils; /** * The asynchronous version of Table. Obtain an instance from a {@link AsyncConnection}. @@ -99,28 +103,105 @@ public interface AsyncTable { * This will return true if the Get matches one or more keys, false if not. *

* This is a server-side call so it prevents any data from being transfered to the client. + * @return true if the specified Get matches one or more keys, false if not. The return value will + * be wrapped by a {@link CompletableFuture}. */ - CompletableFuture exists(Get get); + default CompletableFuture exists(Get get) { + if (!get.isCheckExistenceOnly()) { + get = ReflectionUtils.newInstance(get.getClass(), get); + get.setCheckExistenceOnly(true); + } + return get(get).thenApply(r -> r.getExists()); + } /** * Extracts certain cells from a given row. - *

- * Return the data coming from the specified row, if it exists. If the row specified doesn't - * exist, the {@link Result} instance returned won't contain any - * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. * @param get The object that specifies what data to fetch and from which row. + * @return The data coming from the specified row, if it exists. If the row specified doesn't + * exist, the {@link Result} instance returned won't contain any + * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The + * return value will be wrapped by a {@link CompletableFuture}. */ CompletableFuture get(Get get); /** * Puts some data to the table. * @param put The data to put. + * @return A {@link CompletableFuture} that always returns null when complete normally. */ CompletableFuture put(Put put); /** * Deletes the specified cells/row. * @param delete The object that specifies what to delete. + * @return A {@link CompletableFuture} that always returns null when complete normally. */ CompletableFuture delete(Delete delete); + + /** + * Appends values to one or more columns within a single row. + *

+ * This operation does not appear atomic to readers. Appends are done under a single row lock, so + * write operations to a row are synchronized, but readers do not take row locks so get and scan + * operations can see this operation partially completed. + * @param append object that specifies the columns and amounts to be used for the increment + * operations + * @return values of columns after the append operation (maybe null). The return value will be + * wrapped by a {@link CompletableFuture}. + */ + CompletableFuture append(Append append); + + /** + * Increments one or more columns within a single row. + *

+ * This operation does not appear atomic to readers. Increments are done under a single row lock, + * so write operations to a row are synchronized, but readers do not take row locks so get and + * scan operations can see this operation partially completed. + * @param increment object that specifies the columns and amounts to be used for the increment + * operations + * @return values of columns after the increment. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture increment(Increment increment); + + /** + * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)} + *

+ * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}. + * @param row The row that contains the cell to increment. + * @param family The column family of the cell to increment. + * @param qualifier The column qualifier of the cell to increment. + * @param amount The amount to increment the cell with (or decrement, if the amount is negative). + * @return The new value, post increment. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + default CompletableFuture incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, + long amount) { + return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); + } + + /** + * Atomically increments a column value. If the column value already exists and is not a + * big-endian long, this could throw an exception. If the column value does not yet exist it is + * initialized to amount and written to the specified column. + *

+ * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose + * any increments that have not been flushed. + * @param row The row that contains the cell to increment. + * @param family The column family of the cell to increment. + * @param qualifier The column qualifier of the cell to increment. + * @param amount The amount to increment the cell with (or decrement, if the amount is negative). + * @param durability The persistence guarantee for this increment. + * @return The new value, post increment. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + default CompletableFuture incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, + long amount, Durability durability) { + Preconditions.checkNotNull(row, "row is null"); + Preconditions.checkNotNull(family, "family is null"); + Preconditions.checkNotNull(qualifier, "qualifier is null"); + return increment( + new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)) + .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index cbb4988..89f798c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; + import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -35,7 +37,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; -import org.apache.hadoop.hbase.util.ReflectionUtils; /** * The implementation of AsyncTable. @@ -111,6 +112,45 @@ class AsyncTableImpl implements AsyncTable { return future; } + private static CompletableFuture mutate(HBaseRpcController controller, + HRegionLocation loc, ClientService.Interface stub, REQ req, + Converter reqConvert, + Converter respConverter) { + return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done), + respConverter); + } + + private static CompletableFuture voidMutate(HBaseRpcController controller, + HRegionLocation loc, ClientService.Interface stub, REQ req, + Converter reqConvert) { + return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> { + return null; + }); + } + + private static Result toResult(HBaseRpcController controller, MutateResponse resp) + throws IOException { + if (!resp.hasResult()) { + return null; + } + return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner()); + } + + @FunctionalInterface + private interface NoncedConverter { + D convert(I info, S src, long nonceGroup, long nonce) throws IOException; + } + + private CompletableFuture noncedMutate(HBaseRpcController controller, + HRegionLocation loc, ClientService.Interface stub, REQ req, + NoncedConverter reqConvert, + Converter respConverter) { + long nonceGroup = conn.getNonceGenerator().getNonceGroup(); + long nonce = conn.getNonceGenerator().newNonce(); + return mutate(controller, loc, stub, req, + (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); + } + private SingleRequestCallerBuilder newCaller(Row row, long rpcTimeoutNs) { return conn.callerFactory. single().table(tableName).row(row.getRow()) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) @@ -118,15 +158,6 @@ class AsyncTableImpl implements AsyncTable { } @Override - public CompletableFuture exists(Get get) { - if (!get.isCheckExistenceOnly()) { - get = ReflectionUtils.newInstance(get.getClass(), get); - get.setCheckExistenceOnly(true); - } - return get(get).thenApply(r -> r.getExists()); - } - - @Override public CompletableFuture get(Get get) { return this. newCaller(get, readRpcTimeoutNs) .action((controller, loc, stub) -> AsyncTableImpl @@ -138,25 +169,35 @@ class AsyncTableImpl implements AsyncTable { @Override public CompletableFuture put(Put put) { - return this. newCaller(put, writeRpcTimeoutNs) - .action( - (controller, loc, stub) -> AsyncTableImpl. call( - controller, loc, stub, put, RequestConverter::buildMutateRequest, - (s, c, req, done) -> s.mutate(c, req, done), (c, resp) -> { - return null; - })) + return this + . newCaller(put, writeRpcTimeoutNs).action((controller, loc, stub) -> AsyncTableImpl + . voidMutate(controller, loc, stub, put, RequestConverter::buildMutateRequest)) .call(); } @Override public CompletableFuture delete(Delete delete) { return this. newCaller(delete, writeRpcTimeoutNs) - .action((controller, loc, stub) -> AsyncTableImpl - . call(controller, loc, stub, delete, - RequestConverter::buildMutateRequest, (s, c, req, done) -> s.mutate(c, req, done), - (c, resp) -> { - return null; - })) + .action((controller, loc, stub) -> AsyncTableImpl. voidMutate(controller, loc, stub, + delete, RequestConverter::buildMutateRequest)) + .call(); + } + + @Override + public CompletableFuture append(Append append) { + checkHasFamilies(append); + return this. newCaller(append, writeRpcTimeoutNs) + .action((controller, loc, stub) -> this. noncedMutate(controller, loc, stub, + append, RequestConverter::buildMutateRequest, AsyncTableImpl::toResult)) + .call(); + } + + @Override + public CompletableFuture increment(Increment increment) { + checkHasFamilies(increment); + return this. newCaller(increment, writeRpcTimeoutNs) + .action((controller, loc, stub) -> this. noncedMutate(controller, loc, + stub, increment, RequestConverter::buildMutateRequest, AsyncTableImpl::toResult)) .call(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index ae8c57e..922168d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; @@ -108,7 +109,7 @@ import edu.umd.cs.findbugs.annotations.Nullable; class ConnectionImplementation implements ClusterConnection, Closeable { public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; private static final Log LOG = LogFactory.getLog(ConnectionImplementation.class); - private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled"; + private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; private final boolean hostnamesCanChange; @@ -199,14 +200,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.rpcTimeout = conf.getInt( HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { + if (conf.getBoolean(NonceGenerator.CLIENT_NONCES_ENABLED_KEY, true)) { synchronized (nonceGeneratorCreateLock) { if (nonceGenerator == null) { - nonceGenerator = new PerClientRandomNonceGenerator(); + nonceGenerator = PerClientRandomNonceGenerator.get(); } } } else { - nonceGenerator = new NoNonceGenerator(); + nonceGenerator = NO_NONCE_GENERATOR; } this.stats = ServerStatisticTracker.create(conf); @@ -948,18 +949,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } } - /** Dummy nonce generator for disabled nonces. */ - static class NoNonceGenerator implements NonceGenerator { - @Override - public long getNonceGroup() { - return HConstants.NO_NONCE; - } - @Override - public long newNonce() { - return HConstants.NO_NONCE; - } - } - /** * The record of errors for servers. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 2f5d2b1..eca9ad8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import java.io.IOException; import java.net.InetAddress; @@ -202,4 +203,23 @@ public final class ConnectionUtils { } return serviceName + "@" + hostname + ":" + port; } + + static void checkHasFamilies(Mutation mutation) { + Preconditions.checkArgument(mutation.numFamilies() > 0, + "Invalid arguments to %s, zero columns specified", mutation.toString()); + } + + /** Dummy nonce generator for disabled nonces. */ + static final NonceGenerator NO_NONCE_GENERATOR = new NonceGenerator() { + + @Override + public long newNonce() { + return HConstants.NO_NONCE; + } + + @Override + public long getNonceGroup() { + return HConstants.NO_NONCE; + } + }; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 2802a2c..8d024dd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -18,6 +18,16 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; + +import com.google.common.annotations.VisibleForTesting; +// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY. +// Internally, we use shaded protobuf. This below are part of our public API. +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -49,8 +59,8 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.apache.hadoop.hbase.client.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +//SEE ABOVE NOTE! import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; @@ -65,15 +75,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.Threads; -import com.google.common.annotations.VisibleForTesting; -// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY. -// Internally, we use shaded protobuf. This below are part of our public API. -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; -// SEE ABOVE NOTE! - /** * An implementation of {@link Table}. Used to communicate with a single HBase table. * Lightweight. Get as needed and just close when done. @@ -617,12 +618,6 @@ public class HTable implements Table { } } - private static void checkHasFamilies(final Mutation mutation) throws IOException { - if (mutation.numFamilies() == 0) { - throw new IOException("Invalid arguments to " + mutation + ", zero columns specified"); - } - } - /** * {@inheritDoc} */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NonceGenerator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NonceGenerator.java index a121dde..6b5e8a6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NonceGenerator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NonceGenerator.java @@ -29,9 +29,11 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public interface NonceGenerator { + static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled"; + /** @return the nonce group (client ID) of this client manager. */ - public long getNonceGroup(); + long getNonceGroup(); /** @return New nonce. */ - public long newNonce(); + long newNonce(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PerClientRandomNonceGenerator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PerClientRandomNonceGenerator.java index 875e1f6..6d9e55a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PerClientRandomNonceGenerator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PerClientRandomNonceGenerator.java @@ -25,17 +25,20 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; /** - * NonceGenerator implementation that uses client ID hash + random int as nonce group, - * and random numbers as nonces. + * NonceGenerator implementation that uses client ID hash + random int as nonce group, and random + * numbers as nonces. */ @InterfaceAudience.Private -public class PerClientRandomNonceGenerator implements NonceGenerator { +public final class PerClientRandomNonceGenerator implements NonceGenerator { + + private static final PerClientRandomNonceGenerator INST = new PerClientRandomNonceGenerator(); + private final Random rdm = new Random(); private final long clientId; - public PerClientRandomNonceGenerator() { + private PerClientRandomNonceGenerator() { byte[] clientIdBase = ClientIdGenerator.generateClientId(); - this.clientId = (((long)Arrays.hashCode(clientIdBase)) << 32) + rdm.nextInt(); + this.clientId = (((long) Arrays.hashCode(clientIdBase)) << 32) + rdm.nextInt(); } public long getNonceGroup() { @@ -49,4 +52,11 @@ public class PerClientRandomNonceGenerator implements NonceGenerator { } while (result == HConstants.NO_NONCE); return result; } + + /** + * Get the singleton nonce generator. + */ + public static PerClientRandomNonceGenerator get() { + return INST; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java index 2afe6cf..1d1cce9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java @@ -19,11 +19,11 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.security.UserProvider; @InterfaceAudience.Private @InterfaceStability.Evolving public class CoprocessorHConnection extends ConnectionImplementation { - private static final NonceGenerator NO_NONCE_GEN = new NoNonceGenerator(); /** * Create a {@link ClusterConnection} based on the environment in which we are running the @@ -101,6 +100,6 @@ public class CoprocessorHConnection extends ConnectionImplementation { @Override public NonceGenerator getNonceGenerator() { - return NO_NONCE_GEN; // don't use nonces for coprocessor connection + return ConnectionUtils.NO_NONCE_GENERATOR; // don't use nonces for coprocessor connection } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index 7010c7f..41002cb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -18,12 +18,17 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -33,9 +38,12 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; @Category({ MediumTests.class, ClientTests.class }) public class TestAsyncTable { @@ -48,14 +56,17 @@ public class TestAsyncTable { private static byte[] QUALIFIER = Bytes.toBytes("cq"); - private static byte[] ROW = Bytes.toBytes("row"); - private static byte[] VALUE = Bytes.toBytes("value"); private static AsyncConnection ASYNC_CONN; + @Rule + public TestName testName = new TestName(); + + private byte[] row; + @BeforeClass - public static void setUp() throws Exception { + public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniCluster(1); TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); @@ -63,22 +74,27 @@ public class TestAsyncTable { } @AfterClass - public static void tearDown() throws Exception { + public static void tearDownAfterClass() throws Exception { ASYNC_CONN.close(); TEST_UTIL.shutdownMiniCluster(); } + @Before + public void setUp() throws IOException, InterruptedException { + row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_")); + } + @Test - public void test() throws Exception { + public void testSimple() throws Exception { AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); - table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); - assertTrue(table.exists(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get()); - Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get(); + table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); + assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); + Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); - table.delete(new Delete(ROW)).get(); - result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get(); + table.delete(new Delete(row)).get(); + result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); assertTrue(result.isEmpty()); - assertFalse(table.exists(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get()); + assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); } private byte[] concat(byte[] base, int index) { @@ -86,24 +102,24 @@ public class TestAsyncTable { } @Test - public void testMultiple() throws Exception { + public void testSimpleMultiple() throws Exception { AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); int count = 100; CountDownLatch putLatch = new CountDownLatch(count); IntStream.range(0, count).forEach( - i -> table.put(new Put(concat(ROW, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))) + i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))) .thenAccept(x -> putLatch.countDown())); putLatch.await(); BlockingQueue existsResp = new ArrayBlockingQueue<>(count); IntStream.range(0, count) - .forEach(i -> table.exists(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER)) + .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) .thenAccept(x -> existsResp.add(x))); for (int i = 0; i < count; i++) { assertTrue(existsResp.take()); } BlockingQueue> getResp = new ArrayBlockingQueue<>(count); IntStream.range(0, count) - .forEach(i -> table.get(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER)) + .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); for (int i = 0; i < count; i++) { Pair pair = getResp.take(); @@ -112,20 +128,60 @@ public class TestAsyncTable { } CountDownLatch deleteLatch = new CountDownLatch(count); IntStream.range(0, count).forEach( - i -> table.delete(new Delete(concat(ROW, i))).thenAccept(x -> deleteLatch.countDown())); + i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown())); deleteLatch.await(); IntStream.range(0, count) - .forEach(i -> table.exists(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER)) + .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) .thenAccept(x -> existsResp.add(x))); for (int i = 0; i < count; i++) { assertFalse(existsResp.take()); } IntStream.range(0, count) - .forEach(i -> table.get(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER)) + .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); for (int i = 0; i < count; i++) { Pair pair = getResp.take(); assertTrue(pair.getSecond().isEmpty()); } } + + @Test + public void testIncrement() throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + int count = 100; + CountDownLatch latch = new CountDownLatch(count); + AtomicLong sum = new AtomicLong(0L); + IntStream.range(0, count) + .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> { + sum.addAndGet(x); + latch.countDown(); + })); + latch.await(); + assertEquals(count, Bytes.toLong( + table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER))); + assertEquals((1 + count) * count / 2, sum.get()); + } + + @Test + public void testAppend() throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + int count = 10; + CountDownLatch latch = new CountDownLatch(count); + char suffix = ':'; + AtomicLong suffixCount = new AtomicLong(0L); + IntStream.range(0, count).forEachOrdered( + i -> table.append(new Append(row).add(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix))) + .thenAccept(r -> { + suffixCount.addAndGet(Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars() + .filter(x -> x == suffix).count()); + latch.countDown(); + })); + latch.await(); + assertEquals((1 + count) * count / 2, suffixCount.get()); + String value = Bytes.toString( + table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)); + int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt) + .sorted().toArray(); + assertArrayEquals(IntStream.range(0, count).toArray(), actual); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java new file mode 100644 index 0000000..8fc0f60 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableNoncedRetry { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] QUALIFIER = Bytes.toBytes("cq"); + + private static byte[] VALUE = Bytes.toBytes("value"); + + private static AsyncConnection ASYNC_CONN; + + private static long NONCE = 1L; + + private static NonceGenerator NONCE_GENERATOR = new NonceGenerator() { + + @Override + public long newNonce() { + return NONCE; + } + + @Override + public long getNonceGroup() { + return 1L; + } + }; + + @Rule + public TestName testName = new TestName(); + + private byte[] row; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent()) { + + @Override + public NonceGenerator getNonceGenerator() { + return NONCE_GENERATOR; + } + + }; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + ASYNC_CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws IOException, InterruptedException { + row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_")); + NONCE++; + } + + @Test + public void testAppend() throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + Result result = table.append(new Append(row).add(FAMILY, QUALIFIER, VALUE)).get(); + assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); + result = table.append(new Append(row).add(FAMILY, QUALIFIER, VALUE)).get(); + // the second call should have no effect as we always generate the same nonce. + assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); + result = table.get(new Get(row)).get(); + assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); + } + + @Test + public void testIncrement() throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue()); + // the second call should have no effect as we always generate the same nonce. + assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue()); + Result result = table.get(new Get(row)).get(); + assertEquals(1L, Bytes.toLong(result.getValue(FAMILY, QUALIFIER))); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index b1ad172..c3b9963 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -500,19 +500,28 @@ public class TestMultiParallel { put.addColumn(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L)); // Replace nonce manager with the one that returns each nonce twice. - NonceGenerator cnm = new PerClientRandomNonceGenerator() { - long lastNonce = -1; + NonceGenerator cnm = new NonceGenerator() { + + private final PerClientRandomNonceGenerator delegate = PerClientRandomNonceGenerator.get(); + + private long lastNonce = -1; + @Override - public synchronized long newNonce() { + public long newNonce() { long nonce = 0; if (lastNonce == -1) { - lastNonce = nonce = super.newNonce(); + lastNonce = nonce = delegate.newNonce(); } else { nonce = lastNonce; lastNonce = -1L; } return nonce; } + + @Override + public long getNonceGroup() { + return delegate.getNonceGroup(); + } }; NonceGenerator oldCnm = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index f1b559f..e34c9cd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -321,7 +321,9 @@ public class TestDistributedLogSplitting { } } - private static class NonceGeneratorWithDups extends PerClientRandomNonceGenerator { + private static class NonceGeneratorWithDups implements NonceGenerator { + + private final PerClientRandomNonceGenerator delegate = PerClientRandomNonceGenerator.get(); private boolean isDups = false; private LinkedList nonces = new LinkedList(); @@ -331,12 +333,17 @@ public class TestDistributedLogSplitting { @Override public long newNonce() { - long nonce = isDups ? nonces.removeFirst() : super.newNonce(); + long nonce = isDups ? nonces.removeFirst() : delegate.newNonce(); if (!isDups) { nonces.add(nonce); } return nonce; } + + @Override + public long getNonceGroup() { + return delegate.getNonceGroup(); + } } @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) -- 2.7.4