Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (revision 1001977) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (working copy) @@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.regionserver.wal; -import java.io.EOFException; +import java.io.FilterInputStream; import java.io.IOException; import java.lang.Class; import java.lang.reflect.Constructor; @@ -78,18 +78,43 @@ this.length = l; } + // This section can be confusing. It is specific to how HDFS works. + // Let me try to break it down. This is the problem: + // + // 1. HDFS DataNodes update the NameNode about a filename's length + // on block boundaries or when a file is closed. Therefore, + // if an RS dies, then the NN's fs.getLength() can be out of date + // 2. this.in.available() would work, but it returns int & + // therefore breaks for files > 2GB (happens on big clusters) + // 3. DFSInputStream.getFileLength() gets the actual length from the DNs + // 4. DFSInputStream is wrapped 2 levels deep : this.in.in + // + // So, here we adjust getPos() using getFileLength() so the + // SequenceFile.Reader constructor (aka: first invocation) comes out + // with the correct end of the file: + // this.end = in.getPos() + length; @Override public long getPos() throws IOException { if (this.firstGetPosInvocation) { this.firstGetPosInvocation = false; - // Tell a lie. We're doing this just so that this line up in - // SequenceFile.Reader constructor comes out with the correct length - // on the file: - // this.end = in.getPos() + length; - long available = this.in.available(); - // Length gets added up in the SF.Reader constructor so subtract the - // difference. If available < this.length, then return this.length. - return available >= this.length? available - this.length: this.length; + long adjust = 0; + + try { + Field fIn = FilterInputStream.class.getDeclaredField("in"); + fIn.setAccessible(true); + Object realIn = fIn.get(this.in); + long realLength = ((Long)realIn.getClass(). + getMethod("getFileLength", new Class []{}). + invoke(realIn, new Object []{})).longValue(); + assert(realLength >= this.length); + adjust = realLength - this.length; + } catch(Exception e) { + SequenceFileLogReader.LOG.warn( + "Error while trying to get accurate file length. " + + "Truncation / data loss may occur if RegionServers die.", e); + } + + return adjust + super.getPos(); } return super.getPos(); }