From d19fe97201bc93c19e10824434350717f46996c1 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 26 May 2017 11:43:47 +0800 Subject: [PATCH] HBASE-18042 Client Compatibility breaks between versions 1.2 and 1.3 --- .../hadoop/hbase/regionserver/RSRpcServices.java | 42 ++++--- .../hbase/client/TestAlwaysSetScannerId.java | 5 +- .../hadoop/hbase/client/TestLeaseRenewal.java | 3 +- .../hbase/client/TestScanWithoutFetchingData.java | 132 +++++++++++++++++++++ 4 files changed, 161 insertions(+), 21 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanWithoutFetchingData.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index d3d64c8..d37a287 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver; import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.protobuf.ByteString; import com.google.protobuf.Message; import com.google.protobuf.RpcController; @@ -43,6 +45,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.mutable.MutableObject; @@ -249,7 +252,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private final AtomicLong scannerIdGen = new AtomicLong(0L); private final ConcurrentMap scanners = new ConcurrentHashMap<>(); - + // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients + // which may send next or close request to a region scanner which has already been exhausted. The + // entries will be removed automatically after scannerLeaseTimeoutPeriod. + private final Cache closedScanners; /** * The lease timeout period for client scanners (milliseconds). */ @@ -1024,6 +1030,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); rpcServer.setErrorHandler(this); rs.setName(name); + + closedScanners = CacheBuilder.newBuilder() + .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); } @Override @@ -2430,18 +2439,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler, String scannerName = Long.toString(request.getScannerId()); RegionScannerHolder rsh = scanners.get(scannerName); if (rsh == null) { - // just ignore the close request if scanner does not exists. - if (request.hasCloseScanner() && request.getCloseScanner()) { + // just ignore the next or close request if scanner does not exists. + if (closedScanners.getIfPresent(scannerName) != null) { throw SCANNER_ALREADY_CLOSED; } else { LOG.warn("Client tried to access missing scanner " + scannerName); throw new UnknownScannerException( - "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " - + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " - + "long wait between consecutive client checkins, c) Server may be closing down, " - + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " - + "possible fix would be increasing the value of" - + "'hbase.client.scanner.timeout.period' configuration."); + "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " + + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " + + "long wait between consecutive client checkins, c) Server may be closing down, " + + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " + + "possible fix would be increasing the value of" + + "'hbase.client.scanner.timeout.period' configuration."); } } HRegionInfo hri = rsh.s.getRegionInfo(); @@ -2658,14 +2667,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } values.clear(); } - if (limitReached || moreRows) { - // We stopped prematurely - builder.setMoreResultsInRegion(true); - } else { - // We didn't get a single batch - builder.setMoreResultsInRegion(false); - } - + builder.setMoreResultsInRegion(moreRows); // Check to see if the client requested that we track metrics server side. If the // client requested metrics, retrieve the metrics from the scanner context. if (trackMetrics) { @@ -2835,6 +2837,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (!done) { moreResultsInRegion = scan((PayloadCarryingRpcController) controller, request, rsh, isSmallScan, maxQuotaResultSize, rows, results, builder, lastBlock, context); + } else { + moreResultsInRegion = !results.isEmpty(); } } @@ -2853,9 +2857,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, scannerClosed = true; closeScanner(region, scanner, scannerName, context); } + builder.setMoreResultsInRegion(moreResultsInRegion); builder.setMoreResults(moreResults); return builder.build(); - } catch (Exception e) { + } catch (IOException e) { try { // scanner is closed here scannerClosed = true; @@ -2922,6 +2927,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postScannerClose(scanner); } + closedScanners.put(scannerName, scannerName); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAlwaysSetScannerId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAlwaysSetScannerId.java index d5a7e67..c727450 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAlwaysSetScannerId.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAlwaysSetScannerId.java @@ -17,7 +17,8 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -86,7 +87,7 @@ public class TestAlwaysSetScannerId { long scannerId = resp.getScannerId(); int nextCallSeq = 0; // test next - for (int i = 0; i < 5; i++) { + for (int i = 0; i < COUNT / 2; i++) { req = RequestConverter.buildScanRequest(scannerId, 1, false, nextCallSeq++, false, false); resp = STUB.scan(null, req); assertTrue(resp.hasScannerId()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java index 3333f54..781ddd5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java @@ -119,7 +119,8 @@ public class TestLeaseRenewal { assertTrue(((AbstractClientScanner)rs).renewLease()); // make sure we haven't advanced the scanner assertTrue(Arrays.equals(rs.next().getRow(), ROW_BYTES)); - assertTrue(((AbstractClientScanner)rs).renewLease()); + // renewLease should return false now as we have read all the data already + assertFalse(((AbstractClientScanner) rs).renewLease()); // make sure scanner is exhausted now assertNull(rs.next()); // renewLease should return false now diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanWithoutFetchingData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanWithoutFetchingData.java new file mode 100644 index 0000000..664e538 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanWithoutFetchingData.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.protobuf.ServiceException; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Testcase to make sure that we do not close scanners if ScanRequest.numberOfRows is zero. See + * HBASE-18042 for more details. + */ +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestScanWithoutFetchingData { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final TableName TABLE_NAME = TableName.valueOf("test"); + + private static final byte[] CF = Bytes.toBytes("cf"); + + private static final byte[] CQ = Bytes.toBytes("cq"); + + private static final int COUNT = 10; + + private static HRegionInfo HRI; + + private static ClientProtos.ClientService.BlockingInterface STUB; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(1); + try (Table table = UTIL.createTable(TABLE_NAME, CF)) { + for (int i = 0; i < COUNT; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + HRI = UTIL.getHBaseAdmin().getTableRegions(TABLE_NAME).get(0); + STUB = ((HConnectionImplementation) UTIL.getConnection()) + .getClient(UTIL.getHBaseCluster().getRegionServer(0).getServerName()); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + private void assertResult(int row, Result result) { + assertEquals(row, Bytes.toInt(result.getRow())); + assertEquals(row, Bytes.toInt(result.getValue(CF, CQ))); + } + + @Test + public void test() throws ServiceException, IOException { + Scan scan = new Scan(); + ScanRequest req = RequestConverter.buildScanRequest(HRI.getRegionName(), scan, 0, false); + PayloadCarryingRpcController hrc = new PayloadCarryingRpcController(); + ScanResponse resp = STUB.scan(hrc, req); + assertTrue(resp.getMoreResults()); + assertTrue(resp.getMoreResultsInRegion()); + assertEquals(0, ResponseConverter.getResults(hrc.cellScanner(), resp).length); + long scannerId = resp.getScannerId(); + int nextCallSeq = 0; + // test normal next + for (int i = 0; i < COUNT / 2; i++) { + req = RequestConverter.buildScanRequest(scannerId, 1, false, nextCallSeq++, false, false); + hrc.reset(); + resp = STUB.scan(hrc, req); + assertTrue(resp.getMoreResults()); + assertTrue(resp.getMoreResultsInRegion()); + Result[] results = ResponseConverter.getResults(hrc.cellScanner(), resp); + assertEquals(1, results.length); + assertResult(i, results[0]); + } + // test zero next + req = RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq++, false, false); + hrc.reset(); + resp = STUB.scan(hrc, req); + assertTrue(resp.getMoreResults()); + assertTrue(resp.getMoreResultsInRegion()); + assertEquals(0, ResponseConverter.getResults(hrc.cellScanner(), resp).length); + for (int i = COUNT / 2; i < COUNT; i++) { + req = RequestConverter.buildScanRequest(scannerId, 1, false, nextCallSeq++, false, false); + hrc.reset(); + resp = STUB.scan(hrc, req); + assertTrue(resp.getMoreResults()); + assertEquals(i != COUNT - 1, resp.getMoreResultsInRegion()); + Result[] results = ResponseConverter.getResults(hrc.cellScanner(), resp); + assertEquals(1, results.length); + assertResult(i, results[0]); + } + // close + req = RequestConverter.buildScanRequest(scannerId, 0, true, false); + resp = STUB.scan(null, req); + } +} -- 2.7.4