From 40b359def990e5c064180ab1e1b3484d8bc3776e Mon Sep 17 00:00:00 2001 From: chenheng Date: Tue, 13 Sep 2016 11:30:06 +0800 Subject: [PATCH] HBASE-16610 Unify append, increment with AP --- .../org/apache/hadoop/hbase/client/HTable.java | 72 +++++++++++++++------- .../hadoop/hbase/protobuf/ResponseConverter.java | 4 +- .../org/apache/hadoop/hbase/master/HMaster.java | 1 - 3 files changed, 53 insertions(+), 24 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index bcbb1da..7fbbfa4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -635,20 +635,28 @@ public class HTable implements Table { @Override public Result append(final Append append) throws IOException { checkHasFamilies(append); - NoncedRegionServerCallable callable = - new NoncedRegionServerCallable(this.connection, this.rpcControllerFactory, + NoncedRegionServerCallable callable = + new NoncedRegionServerCallable(this.connection, this.rpcControllerFactory, getName(), append.getRow()) { @Override - protected Result rpcCall() throws Exception { + protected SingleResponse rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce()); MutateResponse response = getStub().mutate(getRpcController(), request); - if (!response.hasResult()) return null; - return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); + return ResponseConverter.getResult(request, response, getRpcControllerCellScanner()); } }; - return rpcCallerFactory. newCaller(this.writeRpcTimeout). - callWithRetries(callable, this.operationTimeout); + + List rows = new ArrayList(); + rows.add(append); + Object[] results = new Object[1]; + AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows, + null, results, callable, operationTimeout); + ars.waitUntilDone(); + if (ars.hasError()) { + throw convertSingleActionError(ars.getErrors()); + } + return ((SingleResponse.Entry)results[0]).getResult(); } /** @@ -657,20 +665,27 @@ public class HTable implements Table { @Override public Result increment(final Increment increment) throws IOException { checkHasFamilies(increment); - NoncedRegionServerCallable callable = - new NoncedRegionServerCallable(this.connection, + NoncedRegionServerCallable callable = + new NoncedRegionServerCallable(this.connection, this.rpcControllerFactory, getName(), increment.getRow()) { @Override - protected Result rpcCall() throws Exception { + protected SingleResponse rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce()); MutateResponse response = getStub().mutate(getRpcController(), request); - // Should this check for null like append does? - return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); + return ResponseConverter.getResult(request, response, getRpcControllerCellScanner()); } }; - return rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, - this.operationTimeout); + List rows = new ArrayList(); + rows.add(increment); + Object[] results = new Object[1]; + AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows, + null, results, callable, operationTimeout); + ars.waitUntilDone(); + if (ars.hasError()) { + throw convertSingleActionError(ars.getErrors()); + } + return ((SingleResponse.Entry)results[0]).getResult(); } /** @@ -703,21 +718,34 @@ public class HTable implements Table { "Invalid arguments to incrementColumnValue", npe); } - NoncedRegionServerCallable callable = - new NoncedRegionServerCallable(this.connection, this.rpcControllerFactory, getName(), - row) { + NoncedRegionServerCallable callable = + new NoncedRegionServerCallable(this.connection, + this.rpcControllerFactory, getName(), row) { @Override - protected Long rpcCall() throws Exception { + protected SingleResponse rpcCall() throws Exception { MutateRequest request = RequestConverter.buildIncrementRequest( getLocation().getRegionInfo().getRegionName(), row, family, qualifier, amount, durability, getNonceGroup(), getNonce()); MutateResponse response = getStub().mutate(getRpcController(), request); - Result result = ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); - return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); + return ResponseConverter.getResult(request, response, getRpcControllerCellScanner()); } }; - return rpcCallerFactory. newCaller(this.writeRpcTimeout). - callWithRetries(callable, this.operationTimeout); + List rows = new ArrayList(); + rows.add(new Increment(row)); + Object[] results = new Object[1]; + AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows, + null, results, callable, operationTimeout); + ars.waitUntilDone(); + if (ars.hasError()) { + throw convertSingleActionError(ars.getErrors()); + } + Result result = ((SingleResponse.Entry)results[0]).getResult(); + return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); + } + + private IOException convertSingleActionError(RetriesExhaustedWithDetailsException e) { + assert e.getCauses().size() == 1; + return (IOException) e.getCauses().get(0); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java index e5deabd..969c83f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java @@ -157,7 +157,9 @@ public final class ResponseConverter { throws IOException { SingleResponse singleResponse = new SingleResponse(); SingleResponse.Entry entry = new SingleResponse.Entry(); - entry.setResult(ProtobufUtil.toResult(response.getResult(), cells)); + if (response.hasResult()) { + entry.setResult(ProtobufUtil.toResult(response.getResult(), cells)); + } entry.setProcessed(response.getProcessed()); singleResponse.setEntry(entry); return singleResponse; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0e07ae0..c643fa8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -196,7 +196,6 @@ import org.mortbay.jetty.servlet.Context; public class HMaster extends HRegionServer implements MasterServices { private static final Log LOG = LogFactory.getLog(HMaster.class.getName()); - /** * Protection against zombie master. Started once Master accepts active responsibility and * starts taking over responsibilities. Allows a finite time window before giving up ownership. -- 2.9.3