diff --git a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java index 16b8281..2859ef7 100644 --- a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java @@ -225,6 +225,7 @@ public abstract class HBaseTestCase extends TestCase { startKeyBytes = START_KEY_BYTES; } return addContent(new HRegionIncommon(r), Bytes.toString(column), + null, startKeyBytes, endKey, -1); } @@ -237,11 +238,16 @@ public abstract class HBaseTestCase extends TestCase { * @throws IOException * @return count of what we added. */ - protected static long addContent(final Incommon updater, final String column) - throws IOException { + protected static long addContent(final Incommon updater, + final String column) throws IOException { return addContent(updater, column, START_KEY_BYTES, null); } + protected static long addContent(final Incommon updater, final String family, + final String column) throws IOException { + return addContent(updater, family, column, START_KEY_BYTES, null); + } + /** * Add content to region r on the passed column * column. @@ -256,7 +262,13 @@ public abstract class HBaseTestCase extends TestCase { protected static long addContent(final Incommon updater, final String column, final byte [] startKeyBytes, final byte [] endKey) throws IOException { - return addContent(updater, column, startKeyBytes, endKey, -1); + return addContent(updater, column, null, startKeyBytes, endKey, -1); + } + + protected static long addContent(final Incommon updater, final String family, + final String column, final byte [] startKeyBytes, + final byte [] endKey) throws IOException { + return addContent(updater, family, column, startKeyBytes, endKey, -1); } /** @@ -271,7 +283,8 @@ public abstract class HBaseTestCase extends TestCase { * @return count of what we added. * @throws IOException */ - protected static long addContent(final Incommon updater, final String column, + protected static long addContent(final Incommon updater, final String columnFamily, + final String column, final byte [] startKeyBytes, final byte [] endKey, final long ts) throws IOException { long count = 0; @@ -295,7 +308,9 @@ public abstract class HBaseTestCase extends TestCase { put.setTimeStamp(ts); } try { - put.add(Bytes.toBytes(column), null, t); + put.add(Bytes.toBytes(columnFamily), + (column == null ? null : Bytes.toBytes(column)), + t); updater.put(put); count++; } catch (RuntimeException ex) { diff --git a/src/test/org/apache/hadoop/hbase/MultiRegionTable.java b/src/test/org/apache/hadoop/hbase/MultiRegionTable.java index 17d48dc..804e535 100644 --- a/src/test/org/apache/hadoop/hbase/MultiRegionTable.java +++ b/src/test/org/apache/hadoop/hbase/MultiRegionTable.java @@ -56,7 +56,7 @@ public class MultiRegionTable extends HBaseClusterTestCase { Bytes.toBytes("yyy") }; - protected final byte [] columnName; + protected final byte [] columnFamily; protected HTableDescriptor desc; /** @@ -64,7 +64,7 @@ public class MultiRegionTable extends HBaseClusterTestCase { */ public MultiRegionTable(final String columnName) { super(); - this.columnName = Bytes.toBytes(columnName); + this.columnFamily = Bytes.toBytes(columnName); // These are needed for the new and improved Map/Reduce framework System.setProperty("hadoop.log.dir", conf.get("hadoop.log.dir")); conf.set("mapred.output.dir", conf.get("hadoop.tmp.dir")); @@ -101,7 +101,7 @@ public class MultiRegionTable extends HBaseClusterTestCase { private HRegion createARegion(byte [] startKey, byte [] endKey) throws IOException { HRegion region = createNewHRegion(desc, startKey, endKey); - addContent(region, this.columnName); + addContent(region, this.columnFamily); closeRegionAndDeleteLog(region); return region; } diff --git a/src/test/org/apache/hadoop/hbase/client/TestClient.java b/src/test/org/apache/hadoop/hbase/client/TestClient.java index 388b450..871af1a 100644 --- a/src/test/org/apache/hadoop/hbase/client/TestClient.java +++ b/src/test/org/apache/hadoop/hbase/client/TestClient.java @@ -1,3 +1,23 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.client; import java.io.IOException; diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java b/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java index baf1772..d240422 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java @@ -361,20 +361,19 @@ public class TestScanner extends HBaseTestCase { } /** + * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner + * update readers code essentially. This is not highly concurrent, since its all 1 thread. * HBase-910. * @throws Exception */ - public void testScanAndConcurrentFlush() throws Exception { + public void testScanAndSyncFlush() throws Exception { this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null); HRegionIncommon hri = new HRegionIncommon(r); try { - String columnString = Bytes.toString(HConstants.CATALOG_FAMILY) + ':' + - Bytes.toString(HConstants.REGIONINFO_QUALIFIER); - LOG.info("Added: " + addContent(hri, columnString)); - int count = count(hri, -1); - assertEquals(count, count(hri, 100)); - assertEquals(count, count(hri, 0)); - assertEquals(count, count(hri, count - 1)); + LOG.info("Added: " + addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY), + Bytes.toString(HConstants.REGIONINFO_QUALIFIER))); + int count = count(hri, -1, false); + assertEquals(count, count(hri, 100, false)); // do a sync flush. } catch (Exception e) { LOG.error("Failed", e); throw e; @@ -384,26 +383,73 @@ public class TestScanner extends HBaseTestCase { shutdownDfs(cluster); } } + + /** + * Tests to do a concurrent flush (using a 2nd thread) while scanning. This tests both + * the StoreScanner update readers and the transition from memcache -> snapshot -> store file. + * + * @throws Exception + */ + public void testScanAndRealConcurrentFlush() throws Exception { + this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null); + HRegionIncommon hri = new HRegionIncommon(r); + try { + LOG.info("Added: " + addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY), + Bytes.toString(HConstants.REGIONINFO_QUALIFIER))); + int count = count(hri, -1, false); + assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush + } catch (Exception e) { + LOG.error("Failed", e); + throw e; + } finally { + this.r.close(); + this.r.getLog().closeAndDelete(); + shutdownDfs(cluster); + } + } + /* * @param hri Region * @param flushIndex At what row we start the flush. + * @param concurrent if the flush should be concurrent or sync. * @return Count of rows found. * @throws IOException */ - private int count(final HRegionIncommon hri, final int flushIndex) + private int count(final HRegionIncommon hri, final int flushIndex, + boolean concurrent) throws IOException { LOG.info("Taking out counting scan"); ScannerIncommon s = hri.getScanner(HConstants.CATALOG_FAMILY, EXPLICIT_COLS, HConstants.EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP); List values = new ArrayList(); int count = 0; + boolean justFlushed = false; while (s.next(values)) { + if (justFlushed) { + LOG.info("after next() just after next flush"); + justFlushed=false; + } count++; if (flushIndex == count) { LOG.info("Starting flush at flush index " + flushIndex); - hri.flushcache(); - LOG.info("Finishing flush"); + Thread t = new Thread() { + public void run() { + try { + hri.flushcache(); + LOG.info("Finishing flush"); + } catch (IOException e) { + LOG.info("Failed flush cache"); + } + } + }; + if (concurrent) { + t.start(); // concurrently flush. + } else { + t.run(); // sync flush + } + LOG.info("Continuing on after kicking off background flush"); + justFlushed = true; } } s.close();