From 9ff2e97cb14187d76d949fea99163e31d75b4e75 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 18 Apr 2017 20:25:37 +0800 Subject: [PATCH] HBASE-17936 --- src/main/asciidoc/_chapters/cp.adoc | 60 +++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/src/main/asciidoc/_chapters/cp.adoc b/src/main/asciidoc/_chapters/cp.adoc index d3fcd47baa..d0dcfef780 100644 --- a/src/main/asciidoc/_chapters/cp.adoc +++ b/src/main/asciidoc/_chapters/cp.adoc @@ -610,7 +610,7 @@ The effect is that the duplicate coprocessor is effectively ignored. + [source, java] ---- -public class SumEndPoint extends SumService implements Coprocessor, CoprocessorService { +public class SumEndPoint extends Sum.SumService implements Coprocessor, CoprocessorService { private RegionCoprocessorEnvironment env; @@ -630,31 +630,33 @@ public class SumEndPoint extends SumService implements Coprocessor, CoprocessorS @Override public void stop(CoprocessorEnvironment env) throws IOException { - // do mothing + // do nothing } @Override - public void getSum(RpcController controller, SumRequest request, RpcCallback done) { + public void getSum(RpcController controller, Sum.SumRequest request, RpcCallback done) { Scan scan = new Scan(); scan.addFamily(Bytes.toBytes(request.getFamily())); scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn())); - SumResponse response = null; + + Sum.SumResponse response = null; InternalScanner scanner = null; + try { scanner = env.getRegion().getScanner(scan); - List results = new ArrayList(); + List results = new ArrayList<>(); boolean hasMore = false; - long sum = 0L; - do { - hasMore = scanner.next(results); - for (Cell cell : results) { - sum = sum + Bytes.toLong(CellUtil.cloneValue(cell)); - } - results.clear(); - } while (hasMore); + long sum = 0L; - response = SumResponse.newBuilder().setSum(sum).build(); + do { + hasMore = scanner.next(results); + for (Cell cell : results) { + sum = sum + Bytes.toLong(CellUtil.cloneValue(cell)); + } + results.clear(); + } while (hasMore); + response = Sum.SumResponse.newBuilder().setSum(sum).build(); } catch (IOException ioe) { ResponseConverter.setControllerException(controller, ioe); } finally { @@ -664,6 +666,7 @@ public class SumEndPoint extends SumService implements Coprocessor, CoprocessorS } catch (IOException ignored) {} } } + done.run(response); } } @@ -681,24 +684,29 @@ Table table = connection.getTable(tableName); //HConnection connection = HConnectionManager.createConnection(conf); //HTableInterface table = connection.getTable("users"); -final SumRequest request = SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross") - .build(); +final Sum.SumRequest request = Sum.SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross").build(); try { -Map results = table.CoprocessorService (SumService.class, null, null, -new Batch.Call() { - @Override - public Long call(SumService aggregate) throws IOException { -BlockingRpcCallback rpcCallback = new BlockingRpcCallback(); - aggregate.getSum(null, request, rpcCallback); - SumResponse response = rpcCallback.get(); - return response.hasSum() ? response.getSum() : 0L; + Map results = table.coprocessorService( + Sum.SumService.class, + null, /* start key */ + null, /* end key */ + new Batch.Call() { + @Override + public Long call(Sum.SumService aggregate) throws IOException { + BlockingRpcCallback rpcCallback = new BlockingRpcCallback<>(); + aggregate.getSum(null, request, rpcCallback); + Sum.SumResponse response = rpcCallback.get(); + + return response.hasSum() ? response.getSum() : 0L; + } } - }); + ); + for (Long sum : results.values()) { System.out.println("Sum = " + sum); } } catch (ServiceException e) { -e.printStackTrace(); + e.printStackTrace(); } catch (Throwable e) { e.printStackTrace(); } -- 2.11.0 (Apple Git-81)