diff --git src/test/java/org/apache/hadoop/hbase/client/TestClientScan.java src/test/java/org/apache/hadoop/hbase/client/TestClientScan.java new file mode 100644 index 0000000..0fb686c --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/client/TestClientScan.java @@ -0,0 +1,133 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test data loss in scan with client retries + */ +@Category(MediumTests.class) +public class TestClientScan { + final Log LOG = LogFactory.getLog(getClass()); + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static String TABLE = "TestClientScan"; + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + protected static int SLAVES = 3; + + public static class DelayNextCoprocessor extends BaseRegionObserver implements + RegionServerObserver { + private int counter = 0; + @Override + public void preStopRegionServer(ObserverContext env) + throws IOException { + } + @Override + public void start(CoprocessorEnvironment e) throws IOException { + } + @Override + public boolean + postScannerNext(final ObserverContext e, + final InternalScanner s, final List results, final int limit, + final boolean hasMore) throws IOException { + HRegion r = e.getEnvironment().getRegion(); + if (r.getTableDesc().getNameAsString().equals(TABLE)) { + counter++; + if (counter % 2 == 0) { + try { + Thread.sleep(100); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + } + return hasMore; + } + } + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + DelayNextCoprocessor.class.getName()); + conf.setInt("hbase.client.retries.number", 2); + conf.setInt("hbase.rpc.timeout", 100); + conf.setInt("hbase.client.pause", 100); + // We need more than one region server in this test + TEST_UTIL.startMiniCluster(SLAVES); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testCallRetryInScan() throws IOException { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + HTable table = TEST_UTIL.createTable(Bytes.toBytes(TABLE), FAMILY); + for (int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes("row" + i)); + put.add(FAMILY, QUALIFIER, Bytes.toBytes(i)); + table.put(put); + } + Scan scan = new Scan(); + ClientScanner scanner = (ClientScanner) table.getScanner(scan); + + int id = 0; + for (Result r : scanner) { + assertEquals("row" + id, Bytes.toString(r.getRow())); + id++; + } + assertEquals(10, id); + scanner.close(); + table.close(); + } +}