From df9a13892ec8e684c6e131289aba72fe22476324 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 17 Oct 2016 14:14:46 +0800 Subject: [PATCH] HBASE-16836 Implement increment and append --- .../hadoop/hbase/client/AsyncConnectionImpl.java | 10 +++ .../org/apache/hadoop/hbase/client/AsyncTable.java | 83 +++++++++++++++++-- .../apache/hadoop/hbase/client/AsyncTableImpl.java | 87 ++++++++++++++------ .../hbase/client/ConnectionImplementation.java | 17 +--- .../hadoop/hbase/client/ConnectionUtils.java | 19 +++++ .../org/apache/hadoop/hbase/client/HTable.java | 27 +++---- .../apache/hadoop/hbase/client/NonceGenerator.java | 6 +- .../hbase/client/CoprocessorHConnection.java | 5 +- .../apache/hadoop/hbase/client/TestAsyncTable.java | 92 +++++++++++++++++----- 9 files changed, 266 insertions(+), 80 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index c50e244..1a1fd04 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.HConstants.CLUSTER_ID_DEFAULT; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; +import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; import io.netty.util.HashedWheelTimer; @@ -37,6 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ConnectionUtils.NoNonceGenerator; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -80,8 +82,11 @@ class AsyncConnectionImpl implements AsyncConnection { final AsyncRpcRetryingCallerFactory callerFactory; + final NonceGenerator nonceGenerator; + private final ConcurrentMap rsStubs = new ConcurrentHashMap<>(); + @SuppressWarnings("deprecation") public AsyncConnectionImpl(Configuration conf, User user) throws IOException { this.conf = conf; this.user = user; @@ -103,6 +108,11 @@ class AsyncConnectionImpl implements AsyncConnection { this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); this.rpcTimeout = conf.getInt(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT); this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER); + if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { + nonceGenerator = new PerClientRandomNonceGenerator(); + } else { + nonceGenerator = new NoNonceGenerator(); + } } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index c4e7cec..a9f6667 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import com.google.common.base.Preconditions; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -24,6 +26,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ReflectionUtils; /** * The asynchronous version of Table. Obtain an instance from a {@link AsyncConnection}. @@ -99,16 +103,22 @@ public interface AsyncTable { * This will return true if the Get matches one or more keys, false if not. *

* This is a server-side call so it prevents any data from being transfered to the client. + * @return true if the specified Get matches one or more keys, false if not */ - CompletableFuture exists(Get get); + default CompletableFuture exists(Get get) { + if (!get.isCheckExistenceOnly()) { + get = ReflectionUtils.newInstance(get.getClass(), get); + get.setCheckExistenceOnly(true); + } + return get(get).thenApply(r -> r.getExists()); + } /** * Extracts certain cells from a given row. - *

- * Return the data coming from the specified row, if it exists. If the row specified doesn't - * exist, the {@link Result} instance returned won't contain any - * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. * @param get The object that specifies what data to fetch and from which row. + * @return The data coming from the specified row, if it exists. If the row specified doesn't + * exist, the {@link Result} instance returned won't contain any + * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. */ CompletableFuture get(Get get); @@ -123,4 +133,67 @@ public interface AsyncTable { * @param delete The object that specifies what to delete. */ CompletableFuture delete(Delete delete); + + /** + * Appends values to one or more columns within a single row. + *

+ * This operation does not appear atomic to readers. Appends are done under a single row lock, so + * write operations to a row are synchronized, but readers do not take row locks so get and scan + * operations can see this operation partially completed. + * @param append object that specifies the columns and amounts to be used for the increment + * operations + * @return values of columns after the append operation (maybe null) + */ + CompletableFuture append(Append append); + + /** + * Increments one or more columns within a single row. + *

+ * This operation does not appear atomic to readers. Increments are done under a single row lock, + * so write operations to a row are synchronized, but readers do not take row locks so get and + * scan operations can see this operation partially completed. + * @param increment object that specifies the columns and amounts to be used for the increment + * operations + * @return values of columns after the increment + */ + CompletableFuture increment(Increment increment); + + /** + * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)} + *

+ * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}. + * @param row The row that contains the cell to increment. + * @param family The column family of the cell to increment. + * @param qualifier The column qualifier of the cell to increment. + * @param amount The amount to increment the cell with (or decrement, if the amount is negative). + * @return The new value, post increment. + */ + default CompletableFuture incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, + long amount) { + return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); + } + + /** + * Atomically increments a column value. If the column value already exists and is not a + * big-endian long, this could throw an exception. If the column value does not yet exist it is + * initialized to amount and written to the specified column. + *

+ * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose + * any increments that have not been flushed. + * @param row The row that contains the cell to increment. + * @param family The column family of the cell to increment. + * @param qualifier The column qualifier of the cell to increment. + * @param amount The amount to increment the cell with (or decrement, if the amount is negative). + * @param durability The persistence guarantee for this increment. + * @return The new value, post increment. + */ + default CompletableFuture incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, + long amount, Durability durability) { + Preconditions.checkNotNull(row, "row is null"); + Preconditions.checkNotNull(family, "family is null"); + Preconditions.checkNotNull(qualifier, "qualifier is null"); + return increment( + new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)) + .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index cbb4988..b08e810 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; + import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -35,7 +37,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; -import org.apache.hadoop.hbase.util.ReflectionUtils; /** * The implementation of AsyncTable. @@ -111,6 +112,45 @@ class AsyncTableImpl implements AsyncTable { return future; } + private static CompletableFuture mutate(HBaseRpcController controller, + HRegionLocation loc, ClientService.Interface stub, REQ req, + Converter reqConvert, + Converter respConverter) { + return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done), + respConverter); + } + + private static CompletableFuture voidMutate(HBaseRpcController controller, + HRegionLocation loc, ClientService.Interface stub, REQ req, + Converter reqConvert) { + return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> { + return null; + }); + } + + private static Result toResult(HBaseRpcController controller, MutateResponse resp) + throws IOException { + if (!resp.hasResult()) { + return null; + } + return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner()); + } + + @FunctionalInterface + private interface NoncedConverter { + D convert(I info, S src, long nonceGroup, long nonce) throws IOException; + } + + private CompletableFuture noncedMutate(HBaseRpcController controller, + HRegionLocation loc, ClientService.Interface stub, REQ req, + NoncedConverter reqConvert, + Converter respConverter) { + long nonceGroup = conn.nonceGenerator.getNonceGroup(); + long nonce = conn.nonceGenerator.newNonce(); + return mutate(controller, loc, stub, req, + (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); + } + private SingleRequestCallerBuilder newCaller(Row row, long rpcTimeoutNs) { return conn.callerFactory. single().table(tableName).row(row.getRow()) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) @@ -118,15 +158,6 @@ class AsyncTableImpl implements AsyncTable { } @Override - public CompletableFuture exists(Get get) { - if (!get.isCheckExistenceOnly()) { - get = ReflectionUtils.newInstance(get.getClass(), get); - get.setCheckExistenceOnly(true); - } - return get(get).thenApply(r -> r.getExists()); - } - - @Override public CompletableFuture get(Get get) { return this. newCaller(get, readRpcTimeoutNs) .action((controller, loc, stub) -> AsyncTableImpl @@ -138,25 +169,35 @@ class AsyncTableImpl implements AsyncTable { @Override public CompletableFuture put(Put put) { - return this. newCaller(put, writeRpcTimeoutNs) - .action( - (controller, loc, stub) -> AsyncTableImpl. call( - controller, loc, stub, put, RequestConverter::buildMutateRequest, - (s, c, req, done) -> s.mutate(c, req, done), (c, resp) -> { - return null; - })) + return this + . newCaller(put, writeRpcTimeoutNs).action((controller, loc, stub) -> AsyncTableImpl + . voidMutate(controller, loc, stub, put, RequestConverter::buildMutateRequest)) .call(); } @Override public CompletableFuture delete(Delete delete) { return this. newCaller(delete, writeRpcTimeoutNs) - .action((controller, loc, stub) -> AsyncTableImpl - . call(controller, loc, stub, delete, - RequestConverter::buildMutateRequest, (s, c, req, done) -> s.mutate(c, req, done), - (c, resp) -> { - return null; - })) + .action((controller, loc, stub) -> AsyncTableImpl. voidMutate(controller, loc, stub, + delete, RequestConverter::buildMutateRequest)) + .call(); + } + + @Override + public CompletableFuture append(Append append) { + checkHasFamilies(append); + return this. newCaller(append, writeRpcTimeoutNs) + .action((controller, loc, stub) -> this. noncedMutate(controller, loc, stub, + append, RequestConverter::buildMutateRequest, AsyncTableImpl::toResult)) + .call(); + } + + @Override + public CompletableFuture increment(Increment increment) { + checkHasFamilies(increment); + return this. newCaller(increment, writeRpcTimeoutNs) + .action((controller, loc, stub) -> this. noncedMutate(controller, loc, + stub, increment, RequestConverter::buildMutateRequest, AsyncTableImpl::toResult)) .call(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index ae8c57e..69eac12 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ConnectionUtils.NoNonceGenerator; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; @@ -108,7 +109,7 @@ import edu.umd.cs.findbugs.annotations.Nullable; class ConnectionImplementation implements ClusterConnection, Closeable { public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; private static final Log LOG = LogFactory.getLog(ConnectionImplementation.class); - private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled"; + private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; private final boolean hostnamesCanChange; @@ -199,7 +200,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.rpcTimeout = conf.getInt( HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { + if (conf.getBoolean(NonceGenerator.CLIENT_NONCES_ENABLED_KEY, true)) { synchronized (nonceGeneratorCreateLock) { if (nonceGenerator == null) { nonceGenerator = new PerClientRandomNonceGenerator(); @@ -948,18 +949,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } } - /** Dummy nonce generator for disabled nonces. */ - static class NoNonceGenerator implements NonceGenerator { - @Override - public long getNonceGroup() { - return HConstants.NO_NONCE; - } - @Override - public long newNonce() { - return HConstants.NO_NONCE; - } - } - /** * The record of errors for servers. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 2f5d2b1..e983454 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import java.io.IOException; import java.net.InetAddress; @@ -202,4 +203,22 @@ public final class ConnectionUtils { } return serviceName + "@" + hostname + ":" + port; } + + static void checkHasFamilies(Mutation mutation) { + Preconditions.checkArgument(mutation.numFamilies() > 0, + "Invalid arguments to %s, zero columns specified", mutation.toString()); + } + + /** Dummy nonce generator for disabled nonces. */ + static final class NoNonceGenerator implements NonceGenerator { + @Override + public long getNonceGroup() { + return HConstants.NO_NONCE; + } + + @Override + public long newNonce() { + return HConstants.NO_NONCE; + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 2802a2c..8d024dd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -18,6 +18,16 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; + +import com.google.common.annotations.VisibleForTesting; +// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY. +// Internally, we use shaded protobuf. This below are part of our public API. +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -49,8 +59,8 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.apache.hadoop.hbase.client.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +//SEE ABOVE NOTE! import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; @@ -65,15 +75,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.Threads; -import com.google.common.annotations.VisibleForTesting; -// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY. -// Internally, we use shaded protobuf. This below are part of our public API. -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; -// SEE ABOVE NOTE! - /** * An implementation of {@link Table}. Used to communicate with a single HBase table. * Lightweight. Get as needed and just close when done. @@ -617,12 +618,6 @@ public class HTable implements Table { } } - private static void checkHasFamilies(final Mutation mutation) throws IOException { - if (mutation.numFamilies() == 0) { - throw new IOException("Invalid arguments to " + mutation + ", zero columns specified"); - } - } - /** * {@inheritDoc} */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NonceGenerator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NonceGenerator.java index a121dde..6b5e8a6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NonceGenerator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NonceGenerator.java @@ -29,9 +29,11 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public interface NonceGenerator { + static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled"; + /** @return the nonce group (client ID) of this client manager. */ - public long getNonceGroup(); + long getNonceGroup(); /** @return New nonce. */ - public long newNonce(); + long newNonce(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java index 2afe6cf..ccc8d39 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java @@ -19,11 +19,12 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.ConnectionUtils.NoNonceGenerator; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerServices; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index 7010c7f..41002cb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -18,12 +18,17 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -33,9 +38,12 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; @Category({ MediumTests.class, ClientTests.class }) public class TestAsyncTable { @@ -48,14 +56,17 @@ public class TestAsyncTable { private static byte[] QUALIFIER = Bytes.toBytes("cq"); - private static byte[] ROW = Bytes.toBytes("row"); - private static byte[] VALUE = Bytes.toBytes("value"); private static AsyncConnection ASYNC_CONN; + @Rule + public TestName testName = new TestName(); + + private byte[] row; + @BeforeClass - public static void setUp() throws Exception { + public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniCluster(1); TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); @@ -63,22 +74,27 @@ public class TestAsyncTable { } @AfterClass - public static void tearDown() throws Exception { + public static void tearDownAfterClass() throws Exception { ASYNC_CONN.close(); TEST_UTIL.shutdownMiniCluster(); } + @Before + public void setUp() throws IOException, InterruptedException { + row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_")); + } + @Test - public void test() throws Exception { + public void testSimple() throws Exception { AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); - table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); - assertTrue(table.exists(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get()); - Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get(); + table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); + assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); + Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); - table.delete(new Delete(ROW)).get(); - result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get(); + table.delete(new Delete(row)).get(); + result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); assertTrue(result.isEmpty()); - assertFalse(table.exists(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get()); + assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); } private byte[] concat(byte[] base, int index) { @@ -86,24 +102,24 @@ public class TestAsyncTable { } @Test - public void testMultiple() throws Exception { + public void testSimpleMultiple() throws Exception { AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); int count = 100; CountDownLatch putLatch = new CountDownLatch(count); IntStream.range(0, count).forEach( - i -> table.put(new Put(concat(ROW, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))) + i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))) .thenAccept(x -> putLatch.countDown())); putLatch.await(); BlockingQueue existsResp = new ArrayBlockingQueue<>(count); IntStream.range(0, count) - .forEach(i -> table.exists(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER)) + .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) .thenAccept(x -> existsResp.add(x))); for (int i = 0; i < count; i++) { assertTrue(existsResp.take()); } BlockingQueue> getResp = new ArrayBlockingQueue<>(count); IntStream.range(0, count) - .forEach(i -> table.get(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER)) + .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); for (int i = 0; i < count; i++) { Pair pair = getResp.take(); @@ -112,20 +128,60 @@ public class TestAsyncTable { } CountDownLatch deleteLatch = new CountDownLatch(count); IntStream.range(0, count).forEach( - i -> table.delete(new Delete(concat(ROW, i))).thenAccept(x -> deleteLatch.countDown())); + i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown())); deleteLatch.await(); IntStream.range(0, count) - .forEach(i -> table.exists(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER)) + .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) .thenAccept(x -> existsResp.add(x))); for (int i = 0; i < count; i++) { assertFalse(existsResp.take()); } IntStream.range(0, count) - .forEach(i -> table.get(new Get(concat(ROW, i)).addColumn(FAMILY, QUALIFIER)) + .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); for (int i = 0; i < count; i++) { Pair pair = getResp.take(); assertTrue(pair.getSecond().isEmpty()); } } + + @Test + public void testIncrement() throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + int count = 100; + CountDownLatch latch = new CountDownLatch(count); + AtomicLong sum = new AtomicLong(0L); + IntStream.range(0, count) + .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> { + sum.addAndGet(x); + latch.countDown(); + })); + latch.await(); + assertEquals(count, Bytes.toLong( + table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER))); + assertEquals((1 + count) * count / 2, sum.get()); + } + + @Test + public void testAppend() throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + int count = 10; + CountDownLatch latch = new CountDownLatch(count); + char suffix = ':'; + AtomicLong suffixCount = new AtomicLong(0L); + IntStream.range(0, count).forEachOrdered( + i -> table.append(new Append(row).add(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix))) + .thenAccept(r -> { + suffixCount.addAndGet(Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars() + .filter(x -> x == suffix).count()); + latch.countDown(); + })); + latch.await(); + assertEquals((1 + count) * count / 2, suffixCount.get()); + String value = Bytes.toString( + table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)); + int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt) + .sorted().toArray(); + assertArrayEquals(IntStream.range(0, count).toArray(), actual); + } } -- 2.7.4