Index: src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java =================================================================== --- src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java (revision 894458) +++ src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java (working copy) @@ -21,7 +21,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestCase; @@ -59,6 +61,31 @@ shutdownDfs(cluster); super.tearDown(); } + + /** + * Test the findMemstoresWithEditsOlderThan method. + * @throws IOException + */ + public void testFindMemstoresWithEditsOlderThan() throws IOException { + Map regionsToSeqids = new HashMap(); + for (int i = 0; i < 10; i++) { + Long l = new Long(i); + regionsToSeqids.put(l.toString().getBytes(), l); + } + byte [][] regions = + HLog.findMemstoresWithEditsOlderThan(1, regionsToSeqids); + assertEquals(1, regions.length); + assertTrue(Bytes.equals(regions[0], "0".getBytes())); + regions = HLog.findMemstoresWithEditsOlderThan(3, regionsToSeqids); + int count = 3; + assertEquals(count, regions.length); + // Regions returned are not ordered. + for (int i = 0; i < count; i++) { + assertTrue(Bytes.equals(regions[i], "0".getBytes()) || + Bytes.equals(regions[i], "1".getBytes()) || + Bytes.equals(regions[i], "2".getBytes())); + } + } /** * Just write multiple logs then split. Before fix for HADOOP-2283, this @@ -184,5 +211,4 @@ } } } - } \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/regionserver/HLog.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HLog.java (revision 894458) +++ src/java/org/apache/hadoop/hbase/regionserver/HLog.java (working copy) @@ -45,6 +45,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -54,17 +55,16 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.Metadata; import org.apache.hadoop.io.SequenceFile.Reader; import org.apache.hadoop.io.compress.DefaultCodec; -import org.apache.hadoop.fs.FSDataOutputStream; /** * HLog stores all the edits to the HStore. @@ -130,7 +130,7 @@ Collections.synchronizedSortedMap(new TreeMap()); /* - * Map of region to last sequence/edit id. + * Map of regions to first sequence/edit id in their memstore. */ private final ConcurrentSkipListMap lastSeqWritten = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); @@ -290,21 +290,21 @@ * cacheFlushLock and then completeCacheFlush could be called which would wait * for the lock on this and consequently never release the cacheFlushLock * - * @return If lots of logs, flush the returned region so next time through + * @return If lots of logs, flush the returned regions so next time through * we can clean logs. Returns null if nothing to flush. * @throws FailedLogCloseException * @throws IOException */ - public byte [] rollWriter() throws FailedLogCloseException, IOException { + public byte [][] rollWriter() throws FailedLogCloseException, IOException { // Return if nothing to flush. if (this.writer != null && this.numEntries.get() <= 0) { return null; } - byte [] regionToFlush = null; + byte [][] regionsToFlush = null; this.cacheFlushLock.lock(); try { if (closed) { - return regionToFlush; + return regionsToFlush; } synchronized (updateLock) { // Clean up current writer. @@ -330,7 +330,7 @@ } this.outputfiles.clear(); } else { - regionToFlush = cleanOldLogs(); + regionsToFlush = cleanOldLogs(); } } this.numEntries.set(0); @@ -340,7 +340,7 @@ } finally { this.cacheFlushLock.unlock(); } - return regionToFlush; + return regionsToFlush; } protected SequenceFile.Writer createWriter(Path path) throws IOException { @@ -363,8 +363,7 @@ * we can clean logs. Returns null if nothing to flush. * @throws IOException */ - private byte [] cleanOldLogs() throws IOException { - byte [] regionToFlush = null; + private byte [][] cleanOldLogs() throws IOException { Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum(); // Get the set of all log files whose final ID is older than or // equal to the oldest pending region operation @@ -372,31 +371,62 @@ new TreeSet(this.outputfiles.headMap( (Long.valueOf(oldestOutstandingSeqNum.longValue() + 1L))).keySet()); // Now remove old log files (if any) - byte [] oldestRegion = null; - if (LOG.isDebugEnabled()) { - // Find region associated with oldest key -- helps debugging. - oldestRegion = getOldestRegion(oldestOutstandingSeqNum); - LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove " + - " out of total " + this.outputfiles.size() + "; " + - "oldest outstanding seqnum is " + oldestOutstandingSeqNum + - " from region " + Bytes.toStringBinary(oldestRegion)); - } - if (sequenceNumbers.size() > 0) { + int logsToRemove = sequenceNumbers.size(); + if (logsToRemove > 0) { + if (LOG.isDebugEnabled()) { + // Find associated region; helps debugging. + byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum); + LOG.debug("Found " + logsToRemove + " hlogs to remove " + + " out of total " + this.outputfiles.size() + "; " + + "oldest outstanding seqnum is " + oldestOutstandingSeqNum + + " from region " + Bytes.toString(oldestRegion)); + } for (Long seq : sequenceNumbers) { deleteLogFile(this.outputfiles.remove(seq), seq); } } - int countOfLogs = this.outputfiles.size() - sequenceNumbers.size(); - if (countOfLogs > this.maxLogs) { - regionToFlush = oldestRegion != null? - oldestRegion: getOldestRegion(oldestOutstandingSeqNum); - LOG.info("Too many hlogs: logs=" + countOfLogs + ", maxlogs=" + - this.maxLogs + "; forcing flush of region with oldest edits: " + - Bytes.toStringBinary(regionToFlush)); + + // If too many log files, figure which regions we need to flush. + byte [][] regions = null; + int logCount = this.outputfiles.size() - logsToRemove; + if (logCount > this.maxLogs && this.outputfiles != null && + this.outputfiles.size() > 0) { + regions = findMemstoresWithEditsOlderThan(this.outputfiles.firstKey(), + this.lastSeqWritten); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < regions.length; i++) { + if (i > 0) sb.append(", "); + sb.append(Bytes.toStringBinary(regions[i])); + } + LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" + + this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " + + sb.toString()); } - return regionToFlush; + return regions; } + /** + * Return regions (memstores) that have edits that are less than the passed + * oldestWALseqid. + * @param oldestWALseqid + * @param regionsToSeqids + * @return All regions whose seqid is < than oldestWALseqid (Not + * necessarily in order). Null if no regions found. + */ + static byte [][] findMemstoresWithEditsOlderThan(final long oldestWALseqid, + final Map regionsToSeqids) { + // This method is static so it can be unit tested the easier. + List regions = null; + for (Map.Entry e: regionsToSeqids.entrySet()) { + if (e.getValue().longValue() < oldestWALseqid) { + if (regions == null) regions = new ArrayList(); + regions.add(e.getKey()); + } + } + return regions == null? + null: regions.toArray(new byte [][] {HConstants.EMPTY_BYTE_ARRAY}); + } + /* * @return Logs older than this id are safe to remove. */ @@ -537,7 +567,8 @@ long seqNum = obtainSeqNum(); logKey.setLogSeqNum(seqNum); // The 'lastSeqWritten' map holds the sequence number of the oldest - // write for each region. When the cache is flushed, the entry for the + // write for each region (i.e. the first edit added to the particular + // memstore). When the cache is flushed, the entry for the // region being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum)); @@ -586,7 +617,8 @@ long seqNum [] = obtainSeqNum(edits.size()); synchronized (this.updateLock) { // The 'lastSeqWritten' map holds the sequence number of the oldest - // write for each region. When the cache is flushed, the entry for the + // write for each region (i.e. the first edit added to the particular + // memstore). . When the cache is flushed, the entry for the // region being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum[0])); Index: src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (revision 894458) +++ src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (working copy) @@ -77,9 +77,9 @@ rollLock.lock(); // Don't interrupt us. We're working try { this.lastrolltime = now; - byte [] regionToFlush = server.getLog().rollWriter(); - if (regionToFlush != null) { - scheduleFlush(regionToFlush); + byte [][] regionsToFlush = server.getLog().rollWriter(); + if (regionsToFlush != null) { + for (byte [] r: regionsToFlush) scheduleFlush(r); } } catch (FailedLogCloseException e) { LOG.fatal("Forcing server shutdown", e); @@ -142,4 +142,4 @@ rollLock.unlock(); } } -} +} \ No newline at end of file