### Eclipse Workspace Patch 1.0 #P apache-trunk Index: hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java (revision 1521712) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java (working copy) @@ -448,6 +448,31 @@ } /** + * Create a protocol buffer ScanRequest for a client Scan + * + * @param regionName + * @param scan + * @param numberOfRows + * @param closeScanner + * @param nextCallSeq + * @return a scan request + * @throws IOException + */ + public static ScanRequest buildScanRequest(final byte[] regionName, + final Scan scan, final int numberOfRows, final boolean closeScanner, + final long nextCallSeq) throws IOException { + ScanRequest.Builder builder = ScanRequest.newBuilder(); + RegionSpecifier region = buildRegionSpecifier( + RegionSpecifierType.REGION_NAME, regionName); + builder.setNumberOfRows(numberOfRows); + builder.setCloseScanner(closeScanner); + builder.setRegion(region); + builder.setScan(ProtobufUtil.toScan(scan)); + builder.setNextCallSeq(nextCallSeq); + return builder.build(); + } + + /** * Create a protocol buffer ScanRequest for a scanner id * * @param scannerId Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (revision 1521633) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (working copy) @@ -146,7 +146,7 @@ } } else { if (scannerId == -1L) { - this.scannerId = openScanner(); + return openScannerAndFetchResults(); } else { Result [] rrs = null; ScanRequest request = null; @@ -283,21 +283,40 @@ this.scannerId = -1L; } - protected long openScanner() throws IOException { + protected Result[] openScannerAndFetchResults() throws IOException { incRPCcallsMetrics(); - ScanRequest request = - RequestConverter.buildScanRequest( - getLocation().getRegionInfo().getRegionName(), - this.scan, 0, false); try { - ScanResponse response = getStub().scan(null, request); - long id = response.getScannerId(); + ScanRequest request = RequestConverter.buildScanRequest(getLocation() + .getRegionInfo().getRegionName(), this.scan, caching, false, + nextCallSeq); + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + ScanResponse response = getStub().scan(controller, request); + nextCallSeq++; + this.scannerId = response.getScannerId(); if (logScannerActivity) { - LOG.info("Open scanner=" + id + " for scan=" + scan.toString() + LOG.info("Open scanner=" + scannerId + " for scan=" + scan.toString() + " on region " + getLocation().toString() + " ip:" + getLocation().getHostnamePort()); } - return id; + + long timestamp = System.currentTimeMillis(); + // Results are returned via controller + CellScanner cellScanner = controller.cellScanner(); + Result[] rrs = ResponseConverter.getResults(cellScanner, response); + if (logScannerActivity) { + long now = System.currentTimeMillis(); + if (now - timestamp > logCutOffLatency) { + int rows = rrs == null ? 0 : rrs.length; + LOG.info("Took " + (now - timestamp) + "ms to fetch " + rows + + " rows from scanner=" + scannerId); + } + } + if (response.hasMoreResults() && !response.getMoreResults()) { + scannerId = -1L; + closed = true; + return null; + } + return rrs; } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 1521712) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (working copy) @@ -237,7 +237,13 @@ callable = getScannerCallable(localStartKey, nbRows); // Open a scanner on the region server starting at the // beginning of the region - this.caller.callWithRetries(callable); + Result [] values =this.caller.callWithRetries(callable); + if (values != null && values.length > 0) { + for (Result rs : values) { + cache.add(rs); + this.lastResult = rs; + } + } this.currentRegion = callable.getHRegionInfo(); if (this.scanMetrics != null) { this.scanMetrics.countOfRegions.incrementAndGet();