Index: CHANGES.txt =================================================================== --- CHANGES.txt (revision 1001981) +++ CHANGES.txt (working copy) @@ -947,6 +947,8 @@ (Andy Chen via Stack) HBASE-3030 The return code of many filesystem operations are not checked (dhruba borthakur via Stack) + HBASE-2646 Compaction requests should be prioritized to prevent blocking + (Jeff Whiting via Stack) NEW FEATURES HBASE-1961 HBase EC2 scripts Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (revision 1001981) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (working copy) @@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.regionserver.wal; -import java.io.EOFException; +import java.io.FilterInputStream; import java.io.IOException; import java.lang.Class; import java.lang.reflect.Constructor; @@ -78,18 +78,43 @@ this.length = l; } + // This section can be confusing. It is specific to how HDFS works. + // Let me try to break it down. This is the problem: + // + // 1. HDFS DataNodes update the NameNode about a filename's length + // on block boundaries or when a file is closed. Therefore, + // if an RS dies, then the NN's fs.getLength() can be out of date + // 2. this.in.available() would work, but it returns int & + // therefore breaks for files > 2GB (happens on big clusters) + // 3. DFSInputStream.getFileLength() gets the actual length from the DNs + // 4. DFSInputStream is wrapped 2 levels deep : this.in.in + // + // So, here we adjust getPos() using getFileLength() so the + // SequenceFile.Reader constructor (aka: first invocation) comes out + // with the correct end of the file: + // this.end = in.getPos() + length; @Override public long getPos() throws IOException { if (this.firstGetPosInvocation) { this.firstGetPosInvocation = false; - // Tell a lie. We're doing this just so that this line up in - // SequenceFile.Reader constructor comes out with the correct length - // on the file: - // this.end = in.getPos() + length; - long available = this.in.available(); - // Length gets added up in the SF.Reader constructor so subtract the - // difference. If available < this.length, then return this.length. - return available >= this.length? available - this.length: this.length; + long adjust = 0; + + try { + Field fIn = FilterInputStream.class.getDeclaredField("in"); + fIn.setAccessible(true); + Object realIn = fIn.get(this.in); + long realLength = ((Long)realIn.getClass(). + getMethod("getFileLength", new Class []{}). + invoke(realIn, new Object []{})).longValue(); + assert(realLength >= this.length); + adjust = realLength - this.length; + } catch(Exception e) { + SequenceFileLogReader.LOG.warn( + "Error while trying to get accurate file length. " + + "Truncation / data loss may occur if RegionServers die.", e); + } + + return adjust + super.getPos(); } return super.getPos(); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 1001981) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy) @@ -212,7 +212,8 @@ LOG.warn("Region " + region.getRegionNameAsString() + " has too many " + "store files; delaying flush up to " + this.blockingWaitTime + "ms"); } - this.server.compactSplitThread.requestCompaction(region, getName()); + this.server.compactSplitThread.requestCompaction(region, getName(), + CompactSplitThread.Priority.HIGH_BLOCKING); // Put back on the queue. Have it come back out of the queue // after a delay of this.blockingWaitTime / 100 ms. this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); Index: src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (revision 1001981) +++ src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (working copy) @@ -20,9 +20,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.util.HashSet; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -43,11 +40,37 @@ private final HRegionServer server; private final Configuration conf; - private final BlockingQueue compactionQueue = - new LinkedBlockingQueue(); + private final PriorityCompactionQueue compactionQueue = + new PriorityCompactionQueue(); - private final HashSet regionsInQueue = new HashSet(); + /** The priorities for a compaction request. */ + public enum Priority implements Comparable { + //NOTE: All priorities should be numbered consecutively starting with 1. + //The highest priority should be 1 followed by all lower priorities. + //Priorities can be changed at anytime without requiring any changes to the + //queue. + /** HIGH_BLOCKING should only be used when an operation is blocked until a + * compact / split is done (e.g. a MemStore can't flush because it has + * "too many store files" and is blocking until a compact / split is done) + */ + HIGH_BLOCKING(1), + /** A normal compaction / split request */ + NORMAL(2), + /** A low compaction / split request -- not currently used */ + LOW(3); + + int value; + + Priority(int value) { + this.value = value; + } + + int getInt() { + return value; + } + } + /** * Splitting should not take place if the total number of regions exceed this. * This is not a hard limit to the number of regions but it is a guideline to @@ -74,9 +97,6 @@ try { r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); if (r != null && !this.server.isStopped()) { - synchronized (regionsInQueue) { - regionsInQueue.remove(r); - } lock.lock(); try { // Don't interrupt us while we are working @@ -107,23 +127,32 @@ } } } - regionsInQueue.clear(); compactionQueue.clear(); LOG.info(getName() + " exiting"); } public synchronized void requestCompaction(final HRegion r, final String why) { - requestCompaction(r, false, why); + requestCompaction(r, false, why, Priority.NORMAL); } + public synchronized void requestCompaction(final HRegion r, + final String why, Priority p) { + requestCompaction(r, false, why, p); + } + + public synchronized void requestCompaction(final HRegion r, + final boolean force, final String why) { + requestCompaction(r, force, why, Priority.NORMAL); + } + /** * @param r HRegion store belongs to * @param force Whether next compaction should be major * @param why Why compaction requested -- used in debug messages */ public synchronized void requestCompaction(final HRegion r, - final boolean force, final String why) { + final boolean force, final String why, Priority priority) { if (this.server.isStopped()) { return; } @@ -131,14 +160,10 @@ if (LOG.isDebugEnabled()) { LOG.debug("Compaction " + (force? "(major) ": "") + "requested for region " + r.getRegionNameAsString() + - (why != null && !why.isEmpty()? " because: " + why: "")); + (why != null && !why.isEmpty()? " because: " + why: "") + + "; Priority: " + priority + "; Compaction queue size: " + compactionQueue.size()); } - synchronized (regionsInQueue) { - if (!regionsInQueue.contains(r)) { - compactionQueue.add(r); - regionsInQueue.add(r); - } - } + compactionQueue.add(r, priority); } private void split(final HRegion parent, final byte [] midKey)