diff --git a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
index 16b8281..2859ef7 100644
--- a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
+++ b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
@@ -225,6 +225,7 @@ public abstract class HBaseTestCase extends TestCase {
startKeyBytes = START_KEY_BYTES;
}
return addContent(new HRegionIncommon(r), Bytes.toString(column),
+ null,
startKeyBytes, endKey, -1);
}
@@ -237,11 +238,16 @@ public abstract class HBaseTestCase extends TestCase {
* @throws IOException
* @return count of what we added.
*/
- protected static long addContent(final Incommon updater, final String column)
- throws IOException {
+ protected static long addContent(final Incommon updater,
+ final String column) throws IOException {
return addContent(updater, column, START_KEY_BYTES, null);
}
+ protected static long addContent(final Incommon updater, final String family,
+ final String column) throws IOException {
+ return addContent(updater, family, column, START_KEY_BYTES, null);
+ }
+
/**
* Add content to region r on the passed column
* column.
@@ -256,7 +262,13 @@ public abstract class HBaseTestCase extends TestCase {
protected static long addContent(final Incommon updater, final String column,
final byte [] startKeyBytes, final byte [] endKey)
throws IOException {
- return addContent(updater, column, startKeyBytes, endKey, -1);
+ return addContent(updater, column, null, startKeyBytes, endKey, -1);
+ }
+
+ protected static long addContent(final Incommon updater, final String family,
+ final String column, final byte [] startKeyBytes,
+ final byte [] endKey) throws IOException {
+ return addContent(updater, family, column, startKeyBytes, endKey, -1);
}
/**
@@ -271,7 +283,8 @@ public abstract class HBaseTestCase extends TestCase {
* @return count of what we added.
* @throws IOException
*/
- protected static long addContent(final Incommon updater, final String column,
+ protected static long addContent(final Incommon updater, final String columnFamily,
+ final String column,
final byte [] startKeyBytes, final byte [] endKey, final long ts)
throws IOException {
long count = 0;
@@ -295,7 +308,9 @@ public abstract class HBaseTestCase extends TestCase {
put.setTimeStamp(ts);
}
try {
- put.add(Bytes.toBytes(column), null, t);
+ put.add(Bytes.toBytes(columnFamily),
+ (column == null ? null : Bytes.toBytes(column)),
+ t);
updater.put(put);
count++;
} catch (RuntimeException ex) {
diff --git a/src/test/org/apache/hadoop/hbase/MultiRegionTable.java b/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
index 17d48dc..804e535 100644
--- a/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
+++ b/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
@@ -56,7 +56,7 @@ public class MultiRegionTable extends HBaseClusterTestCase {
Bytes.toBytes("yyy")
};
- protected final byte [] columnName;
+ protected final byte [] columnFamily;
protected HTableDescriptor desc;
/**
@@ -64,7 +64,7 @@ public class MultiRegionTable extends HBaseClusterTestCase {
*/
public MultiRegionTable(final String columnName) {
super();
- this.columnName = Bytes.toBytes(columnName);
+ this.columnFamily = Bytes.toBytes(columnName);
// These are needed for the new and improved Map/Reduce framework
System.setProperty("hadoop.log.dir", conf.get("hadoop.log.dir"));
conf.set("mapred.output.dir", conf.get("hadoop.tmp.dir"));
@@ -101,7 +101,7 @@ public class MultiRegionTable extends HBaseClusterTestCase {
private HRegion createARegion(byte [] startKey, byte [] endKey) throws IOException {
HRegion region = createNewHRegion(desc, startKey, endKey);
- addContent(region, this.columnName);
+ addContent(region, this.columnFamily);
closeRegionAndDeleteLog(region);
return region;
}
diff --git a/src/test/org/apache/hadoop/hbase/client/TestClient.java b/src/test/org/apache/hadoop/hbase/client/TestClient.java
index 388b450..871af1a 100644
--- a/src/test/org/apache/hadoop/hbase/client/TestClient.java
+++ b/src/test/org/apache/hadoop/hbase/client/TestClient.java
@@ -1,3 +1,23 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hbase.client;
import java.io.IOException;
diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java b/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java
index baf1772..d240422 100644
--- a/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java
+++ b/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java
@@ -361,20 +361,19 @@ public class TestScanner extends HBaseTestCase {
}
/**
+ * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner
+ * update readers code essentially. This is not highly concurrent, since its all 1 thread.
* HBase-910.
* @throws Exception
*/
- public void testScanAndConcurrentFlush() throws Exception {
+ public void testScanAndSyncFlush() throws Exception {
this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
HRegionIncommon hri = new HRegionIncommon(r);
try {
- String columnString = Bytes.toString(HConstants.CATALOG_FAMILY) + ':' +
- Bytes.toString(HConstants.REGIONINFO_QUALIFIER);
- LOG.info("Added: " + addContent(hri, columnString));
- int count = count(hri, -1);
- assertEquals(count, count(hri, 100));
- assertEquals(count, count(hri, 0));
- assertEquals(count, count(hri, count - 1));
+ LOG.info("Added: " + addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
+ Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
+ int count = count(hri, -1, false);
+ assertEquals(count, count(hri, 100, false)); // do a sync flush.
} catch (Exception e) {
LOG.error("Failed", e);
throw e;
@@ -384,26 +383,73 @@ public class TestScanner extends HBaseTestCase {
shutdownDfs(cluster);
}
}
+
+ /**
+ * Tests to do a concurrent flush (using a 2nd thread) while scanning. This tests both
+ * the StoreScanner update readers and the transition from memcache -> snapshot -> store file.
+ *
+ * @throws Exception
+ */
+ public void testScanAndRealConcurrentFlush() throws Exception {
+ this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
+ HRegionIncommon hri = new HRegionIncommon(r);
+ try {
+ LOG.info("Added: " + addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
+ Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
+ int count = count(hri, -1, false);
+ assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush
+ } catch (Exception e) {
+ LOG.error("Failed", e);
+ throw e;
+ } finally {
+ this.r.close();
+ this.r.getLog().closeAndDelete();
+ shutdownDfs(cluster);
+ }
+ }
+
/*
* @param hri Region
* @param flushIndex At what row we start the flush.
+ * @param concurrent if the flush should be concurrent or sync.
* @return Count of rows found.
* @throws IOException
*/
- private int count(final HRegionIncommon hri, final int flushIndex)
+ private int count(final HRegionIncommon hri, final int flushIndex,
+ boolean concurrent)
throws IOException {
LOG.info("Taking out counting scan");
ScannerIncommon s = hri.getScanner(HConstants.CATALOG_FAMILY, EXPLICIT_COLS,
HConstants.EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP);
List values = new ArrayList();
int count = 0;
+ boolean justFlushed = false;
while (s.next(values)) {
+ if (justFlushed) {
+ LOG.info("after next() just after next flush");
+ justFlushed=false;
+ }
count++;
if (flushIndex == count) {
LOG.info("Starting flush at flush index " + flushIndex);
- hri.flushcache();
- LOG.info("Finishing flush");
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ hri.flushcache();
+ LOG.info("Finishing flush");
+ } catch (IOException e) {
+ LOG.info("Failed flush cache");
+ }
+ }
+ };
+ if (concurrent) {
+ t.start(); // concurrently flush.
+ } else {
+ t.run(); // sync flush
+ }
+ LOG.info("Continuing on after kicking off background flush");
+ justFlushed = true;
}
}
s.close();