Index: src/test/org/apache/hadoop/hbase/client/TestForceSplit.java =================================================================== --- src/test/org/apache/hadoop/hbase/client/TestForceSplit.java (revision 929367) +++ src/test/org/apache/hadoop/hbase/client/TestForceSplit.java (working copy) @@ -68,7 +68,10 @@ } if (regions == null) continue; this.regionCount.set(regions.size()); - if (this.regionCount.get() >= 2) break; + if (this.regionCount.get() >= 2) { + LOG.info("Region split!"); + break; + } LOG.debug("Cycle waiting on split"); } // Before leaving, print out the regions found in .META. @@ -110,6 +113,7 @@ HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor(columnName)); HBaseAdmin admin = new HBaseAdmin(conf); + System.out.println("create table"); admin.createTable(htd); final HTable table = new HTable(conf, tableName); byte[] k = new byte[3]; @@ -150,11 +154,86 @@ // Scan first row so we are into first region before split happens. scanner.next(); - Thread t = new WaitOnSplit(table); + WaitOnSplit t = new WaitOnSplit(table); t.start(); // tell the master to split the table admin.split(Bytes.toString(tableName)); t.join(); + assertEquals("Expected 2 regions but table has " + t.getCount(), 2, t.getCount()); + + // Verify row count + rows = 1; // We counted one row above. + for (Result result : scanner) { + rows++; + if (rows > rowCount) { + scanner.close(); + assertTrue("Scanned more than expected (" + rowCount + ")", false); + } + } + scanner.close(); + assertEquals(rowCount, rows); + } + + private static byte [] multiTable = Bytes.toBytes("multiTable"); + private static byte [] familyOne = Bytes.toBytes("family1"); + private static byte [] familyTwo = Bytes.toBytes("family2"); + + /** + * Tests forcing split from client and having scanners successfully ride over split. + * @throws Exception + * @throws IOException + */ + public void testForceSplitMultiFamily() throws Exception { + // create the test table + HTableDescriptor htd = new HTableDescriptor(multiTable); + htd.addFamily(new HColumnDescriptor(familyOne)); + htd.addFamily(new HColumnDescriptor(familyTwo)); + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(htd); + final HTable table = new HTable(conf, multiTable); + byte[] k = new byte[3]; + int rowCount = 0; + for (byte b1 = 'a'; b1 < 'z'; b1++) { + for (byte b2 = 'a'; b2 < 'z'; b2++) { + for (byte b3 = 'a'; b3 < 'z'; b3++) { + k[0] = b1; + k[1] = b2; + k[2] = b3; + Put put = new Put(k); + put.add(familyTwo, null, k); + table.put(put); + rowCount++; + } + } + } + + // get the initial layout (should just be one region) + Map m = table.getRegionsInfo(); + System.out.println("Initial regions (" + m.size() + "): " + m); + assertTrue(m.size() == 1); + + // Verify row count + Scan scan = new Scan(); + ResultScanner scanner = table.getScanner(scan); + int rows = 0; + for(Result result : scanner) { + rows++; + } + scanner.close(); + assertEquals(rowCount, rows); + + // Have an outstanding scan going on to make sure we can scan over splits. + scan = new Scan(); + scanner = table.getScanner(scan); + // Scan first row so we are into first region before split happens. + scanner.next(); + + WaitOnSplit t = new WaitOnSplit(table); + t.start(); + // tell the master to split the table + admin.split(Bytes.toString(multiTable)); + t.join(); + assertEquals("Expected 2 regions but table has " + t.getCount(), 2, t.getCount()); // Verify row count rows = 1; // We counted one row above. Index: src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 929367) +++ src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -862,6 +862,7 @@ try { LOG.info("" + addContent(region, fam3)); region.flushcache(); + region.shouldSplit(true); byte [] splitRow = region.compactStores(); assertNotNull(splitRow); LOG.info("SplitRow: " + Bytes.toString(splitRow)); @@ -1672,6 +1673,7 @@ try { LOG.info("" + addContent(region, fam3)); region.flushcache(); + region.shouldSplit(true); byte [] splitRow = region.compactStores(); assertNotNull(splitRow); LOG.info("SplitRow: " + Bytes.toString(splitRow)); Index: src/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 929367) +++ src/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -662,8 +663,8 @@ *

During this time, the Store can work as usual, getting values from * StoreFiles and writing new StoreFiles from the memstore. * - * Existing StoreFiles are not destroyed until the new compacted StoreFile is - * completely written-out to disk. + *

Existing StoreFiles are not destroyed until the new compacted StoreFile + * is completely written-out to disk. * *

The compactLock prevents multiple simultaneous compactions. * The structureLock prevents us from interfering with other write operations. @@ -671,12 +672,22 @@ *

We don't want to hold the structureLock for the whole time, as a compact() * can be lengthy and we want to allow cache-flushes during this period. * + *

This method returns: + * + * - SKIP_STORE (null) if the Store does not match split criteria but should + * not prevent the Region from splitting if other Stores match the criteria + * + * - DO_NOT_SPLIT if the Store has references or this is a major compaction + * and we should definitely not split the Region regardless of other Stores + * + * - Constructed SplitInfo containing size and splitKey if this Store matches + * split criteria and the Region should be split. + * * @param mc True to force a major compaction regardless of thresholds - * @return row to split around if a split is needed, null otherwise + * @return null or a SplitInfo object * @throws IOException */ - StoreSize compact(final boolean mc) throws IOException { - boolean forceSplit = this.region.shouldSplit(false); + SplitInfo compact(final boolean mc, final boolean forceSplit) throws IOException { boolean majorcompaction = mc; synchronized (compactLock) { // filesToCompact are sorted oldest to newest. @@ -684,7 +695,7 @@ new ArrayList(this.storefiles.values()); if (filesToCompact.isEmpty()) { LOG.debug(this.storeNameStr + ": no store files to compact"); - return null; + return SKIP_STORE; } // Max-sequenceID is the last key of the storefiles TreeMap @@ -698,15 +709,27 @@ } boolean references = hasReferences(filesToCompact); - if (!majorcompaction && !references && - (forceSplit || (filesToCompact.size() < compactionThreshold))) { - return checkSplit(forceSplit); + if (!majorcompaction && !references) { + // If this is not a major compaction and we don't have references, + // we have the option to split instead of compact, or skip compaction + // all together. + if(forceSplit) { + // Forcing a split. This is either triggered by the user or was + // triggered post-flush because the aggregate size of a Store + // surpassed the maximum size (HBASE-2375). + return getSplitInfo(filesToCompact); + } else if(filesToCompact.size() < compactionThreshold) { + // If we haven't reached the compaction threshold, skip compaction. + return SKIP_STORE; + } } - + // Do not trigger any splits otherwise, so always return references=true + // which will prevent splitting. + if (!fs.exists(this.regionCompactionDir) && !fs.mkdirs(this.regionCompactionDir)) { LOG.warn("Mkdir on " + this.regionCompactionDir.toString() + " failed"); - return checkSplit(forceSplit); + return DO_NOT_SPLIT; } // HBASE-745, preparing all store file sizes for incremental compacting @@ -721,7 +744,7 @@ Path path = file.getPath(); if (path == null) { LOG.warn("Path is null for " + file); - return null; + return DO_NOT_SPLIT; } Reader r = file.getReader(); if (r == null) { @@ -754,7 +777,7 @@ StringUtils.humanReadableInt(totalSize) + "; Skipped " + point + " files, size: " + skipped); } - return checkSplit(forceSplit); + return DO_NOT_SPLIT; } if (LOG.isDebugEnabled()) { LOG.debug("Compaction size of " + this.storeNameStr + ": " + @@ -777,7 +800,7 @@ "; store size is " + StringUtils.humanReadableInt(storeSize)); } } - return checkSplit(forceSplit); + return DO_NOT_SPLIT; } /* @@ -1202,76 +1225,51 @@ } /** - * Determines if HStore can be split - * @param force Whether to force a split or not. - * @return a StoreSize if store can be split, null otherwise. + * Called to get split information for this Store. + * + * Returns the total aggregate size of Store (sum of StoreFile sizes) and + * the key to split on. + * + * The split key is determined by finding the median midKey across all + * StoreFiles in this Store. If an even number of StoreFiles, we use the + * bigger of the two medians to bias towards sequential write scenarios. + * + * @return the total size of the store and key to split on */ - StoreSize checkSplit(final boolean force) { + private SplitInfo getSplitInfo(List storeFiles) { this.lock.readLock().lock(); try { - // Iterate through all store files - if (this.storefiles.isEmpty()) { - return null; - } - if (!force && (storeSize < this.desiredMaxFileSize)) { - return null; - } - // Not splitable if we find a reference store file present in the store. - boolean splitable = true; - long maxSize = 0L; - Long mapIndex = Long.valueOf(0L); - for (Map.Entry e: storefiles.entrySet()) { - StoreFile sf = e.getValue(); - if (splitable) { - splitable = !sf.isReference(); - if (!splitable) { - // RETURN IN MIDDLE OF FUNCTION!!! If not splitable, just return. - if (LOG.isDebugEnabled()) { - LOG.debug(sf + " is not splittable"); - } - return null; - } - } - Reader r = sf.getReader(); - if (r == null) { - LOG.warn("Storefile " + sf + " Reader is null"); - continue; + // Iterate StoreFiles collecting their midKeys + // Also check to make sure we find more than one row + boolean onlyOneRow = true; + KeyValue [] midKeys = new KeyValue[storeFiles.size()]; + for(int i=0;i maxSize) { - // This is the largest one so far - maxSize = size; - mapIndex = e.getKey(); + byte [] midKey = r.midkey(); + midKeys[i] = KeyValue.createKeyValueFromKey(midKey, 0, midKey.length); + if(onlyOneRow) { + // Check if there is only a single row in the current read + if(!containsOneRow(r)) { + onlyOneRow = false; + } } } - StoreFile sf = this.storefiles.get(mapIndex); - HFile.Reader r = sf.getReader(); - if (r == null) { - LOG.warn("Storefile " + sf + " Reader is null"); - return null; - } - // Get first, last, and mid keys. Midkey is the key that starts block - // in middle of hfile. Has column and timestamp. Need to return just - // the row we want to split on as midkey. - byte [] midkey = r.midkey(); - if (midkey != null) { - KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length); - byte [] fk = r.getFirstKey(); - KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length); - byte [] lk = r.getLastKey(); - KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length); - // if the midkey is the same as the first and last keys, then we cannot - // (ever) split this region. - if (this.comparator.compareRows(mk, firstKey) == 0 && - this.comparator.compareRows(mk, lastKey) == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("cannot split because midkey is the same as first or " + - "last row"); - } - return null; + if(onlyOneRow) { + // Cannot split single row + if (LOG.isDebugEnabled()) { + LOG.debug("cannot split because midkey is the same as first and " + + "last row"); } - return new StoreSize(maxSize, mk.getRow()); + return null; } + // Find the median midKey + KeyValue midKey = getMedianKey(midKeys); + return new SplitInfo(storeSize, midKey.getRow()); } catch(IOException e) { LOG.warn("Failed getting store size for " + this.storeNameStr, e); } finally { @@ -1279,12 +1277,60 @@ } return null; } + + /** + * Returns true if this HFile contains only one row. + * @return + * @throws IOException + */ + private boolean containsOneRow(HFile.Reader r) throws IOException { + byte [] fk = r.getFirstKey(); + KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length); + byte [] lk = r.getLastKey(); + KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length); + // if the midkey is the same as the first and last keys, then we cannot + // (ever) split this region. + return Bytes.compareTo(firstKey.getBuffer(), firstKey.getRowOffset(), + firstKey.getRowLength(), lastKey.getBuffer(), lastKey.getRowOffset(), + lastKey.getRowLength()) == 0; + } + + /** + * Returns the median key from the specified array. + * + * Method sorts the array of keys and takes the median key. If there + * are an even number of keys, it biases towards the larger key. + * + * TODO: Use a weighted median so we don't skew the split key if there + * are small StoreFiles. + * + * @param keys array of keys (these will be sorted) + * @return median key + */ + KeyValue getMedianKey(KeyValue [] keys) { + Arrays.sort(keys, this.comparator); + int medianIndex = (int)Math.ceil((keys.length - 1) / (double)2); + return keys[medianIndex]; + } /** @return aggregate size of HStore */ public long getSize() { return storeSize; } + /** + * Returns true if this Store should be split because its size is greater + * than the desired maximum file size. + * + * This uses the aggregate size of all StoreFiles in a Store, rather than + * the biggest single StoreFile (HBASE-2375). + * + * @return true if the Store meets the split condition, false if not + */ + public boolean shouldSplit() { + return storeSize >= desiredMaxFileSize; + } + ////////////////////////////////////////////////////////////////////////////// // File administration ////////////////////////////////////////////////////////////////////////////// @@ -1346,18 +1392,32 @@ return size; } - /* - * Datastructure that holds size and row to split a file around. - * TODO: Take a KeyValue rather than row. + // Static SplitInfos to make code more readable + private static final SplitInfo DO_NOT_SPLIT = new SplitInfo(); + private static final SplitInfo SKIP_STORE = null; + + /** + * Data structure that returns from each Store that includes information + * about whether this Store should be split, and if so, the total size of + * the Store (aggregate of all StoreFiles) and the splitRow. */ - static class StoreSize { + static class SplitInfo { private final long size; private final byte [] row; + private final boolean canSplit; - StoreSize(long size, byte [] row) { + SplitInfo(long size, byte [] row) { this.size = size; this.row = row; + this.canSplit = true; + } + + SplitInfo() { + this.size = 0; + this.row = null; + this.canSplit = false; } + /* @return the size */ long getSize() { return size; @@ -1366,6 +1426,10 @@ byte [] getSplitRow() { return this.row; } + + boolean canSplit() { + return canSplit; + } } HRegion getHRegion() { Index: src/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 929367) +++ src/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy) @@ -95,7 +95,7 @@ conf.getInt("hbase.hstore.blockingStoreFiles", -1); if (this.blockingStoreFilesNumber == -1) { this.blockingStoreFilesNumber = 1 + - conf.getInt("hbase.hstore.compactionThreshold", 3); + conf.getInt("hbase.hstore.compactionThreshold", 5); } this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); // default of 180 seconds @@ -235,11 +235,15 @@ lock.lock(); } try { - // See comment above for removeFromQueue on why we do not - // compact if removeFromQueue is true. Note that region.flushCache() - // only returns true if a flush is done and if a compaction is needed. - if (region.flushcache() && !removeFromQueue) { - server.compactSplitThread.compactionRequested(region, getName()); + // Perform flush. Returns whether we have exceeded compaction threshold + boolean needCompaction = region.flushcache(); + // If removeFromQueue is true, then we are in emergency mode so we + // should not do any compactions or splits. If false, check if we + // should split, if not, compact if we should compact. + if(!removeFromQueue) { + if(!checkStoreSizesAndSplit(region) && needCompaction) { + server.compactSplitThread.compactionRequested(region, getName()); + } } } catch (DroppedSnapshotException ex) { // Cache flush can fail in a few places. If it fails in a critical @@ -264,6 +268,33 @@ return true; } + /** + * Checks all Stores in this Region to see if they should be split. If any + * Store meets the split conditions, this will trigger a Region split. + * + * @param region the region to check if we should split it + * @return true if it was decided to perform a split, false if not + */ + private boolean checkStoreSizesAndSplit(final HRegion region) { + // Iterate stores of this region, checking if they are big enough to split + for(Store store : region.stores.values()) { + if(store.shouldSplit()) { + // Method of doing splits is now to set the force split flag + // (shouldSplit) to true and then triggering a compaction. + // This keeps everything synchronized via the compactSplitThread. + // When the compaction is triggered, the forced split will preempt + // an actual compaction from happening. + LOG.info("Split triggered on region " + region.toString() + + " because Store " + store.toString() + " exceeds max size"); + region.shouldSplit(true); + server.compactSplitThread.compactionRequested(region, + "store size > max size"); + return true; + } + } + return false; + } + /* * If too many store files already, schedule a compaction and pause a while * before going on with compaction. Index: src/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 929367) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -788,6 +788,7 @@ splitsAndClosesLock.readLock().lock(); try { byte [] splitRow = null; + boolean dontSplit = false; if (this.closed.get()) { return splitRow; } @@ -807,9 +808,20 @@ long startTime = System.currentTimeMillis(); doRegionCompactionPrep(); long maxSize = -1; + boolean forceSplit = shouldSplit(false); for (Store store: stores.values()) { - final Store.StoreSize ss = store.compact(majorCompaction); - if (ss != null && ss.getSize() > maxSize) { + final Store.SplitInfo ss = store.compact(majorCompaction, + forceSplit); + if(ss == null) { + // This Store does not match split criteria, but does not prevent + // other Stores from triggering the split. + continue; + } else if(!ss.canSplit()) { + // This Store cannot be split (has a reference or this is a major + // compaction) and should prevent the Region from being split. + dontSplit = true; + } else if (ss.getSize() > maxSize) { + // This Store should be split and trigger a Region split. maxSize = ss.getSize(); splitRow = ss.getSplitRow(); } @@ -824,6 +836,9 @@ writestate.notifyAll(); } } + if(dontSplit) { + return null; + } return splitRow; } finally { splitsAndClosesLock.readLock().unlock(); @@ -2691,7 +2706,11 @@ } /** - * For internal use in forcing splits ahead of file size limit. + * Used to trigger the split of a Region the next time it is compacted. + * + * This is flipped to true when a user manually triggers a split, or + * post-flush when any Store has exceeded its size limit. + * * @param b * @return previous value */