Index: C:/workspace/hbase-sync/conf/hbase-default.xml =================================================================== --- C:/workspace/hbase-sync/conf/hbase-default.xml (revision 747917) +++ C:/workspace/hbase-sync/conf/hbase-default.xml (working copy) @@ -182,13 +182,18 @@ hbase.regionserver.hlog.blocksize - 1048576 - Block size for HLog files. To minimize potential data loss, - the size should be (avg key length) * (avg value length) * flushlogentries. - Default 1MB. + 67108864 + Block size for HLog files. Default 64MB. + hbase.regionserver.hlog.leaserecoveryperiod + 10000 + Amount of time to wait between HLog lease recovery attempts. + Default 10 seconds + + + hbase.regionserver.thread.splitcompactcheckfrequency 20000 How often a region server runs the split/compaction check. Index: C:/workspace/hbase-sync/src/java/org/apache/hadoop/hbase/regionserver/HLog.java =================================================================== --- C:/workspace/hbase-sync/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (revision 747917) +++ C:/workspace/hbase-sync/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (working copy) @@ -24,6 +24,8 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.Collections; import java.util.Map; import java.util.SortedMap; @@ -35,6 +37,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -91,6 +94,8 @@ public class HLog implements HConstants, Syncable { private static final Log LOG = LogFactory.getLog(HLog.class); private static final String HLOG_DATFILE = "hlog.dat."; + private static final SimpleDateFormat DATE_FORMAT = + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); static final byte [] METACOLUMN = Bytes.toBytes("METACOLUMN:"); static final byte [] METAROW = Bytes.toBytes("METAROW"); final FileSystem fs; @@ -146,7 +151,7 @@ * Keep the number of logs tidy. */ private final int maxLogs; - + /** * Create an edit log at the given dir location. * @@ -173,7 +178,7 @@ this.flushlogentries = conf.getInt("hbase.regionserver.flushlogentries", 100); this.blocksize = - conf.getLong("hbase.regionserver.hlog.blocksize", 1024L * 1024L); + conf.getLong("hbase.regionserver.hlog.blocksize", 64L * 1024L * 1024L); this.optionalFlushInterval = conf.getLong("hbase.regionserver.optionallogflushinterval", 10 * 1000); this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); @@ -492,9 +497,14 @@ } } + // This is public only because it implements a method in Syncable. public void sync() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Sync-ing " + unflushedEntries + ". Last flush time was: " + + DATE_FORMAT.format(new Date(lastLogFlushTime))); + } lastLogFlushTime = System.currentTimeMillis(); - this.writer.sync(); + this.writer.syncFs(); unflushedEntries = 0; } @@ -737,16 +747,42 @@ throws IOException { Map logWriters = new TreeMap(Bytes.BYTES_COMPARATOR); + + long leaseRecoveryPeriod = + conf.getLong("hbase.regionserver.hlog.leaserecoveryperiod", 10000); + try { for (int i = 0; i < logfiles.length; i++) { if (LOG.isDebugEnabled()) { LOG.debug("Splitting " + (i + 1) + " of " + logfiles.length + ": " + logfiles[i].getPath()); } - // Check for possibly empty file. With appends, currently Hadoop reports - // a zero length even if the file has been sync'd. Revisit if - // HADOOP-4751 is committed. - boolean possiblyEmpty = logfiles[i].getLen() <= 0; + // Recover the files lease if necessary + boolean recovered = false; + while (!recovered) { + try { + FSDataOutputStream out = fs.append(logfiles[i].getPath()); + out.close(); + recovered = true; + } catch (IOException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Triggering lease recovery."); + } + try { + Thread.sleep(leaseRecoveryPeriod); + } catch (InterruptedException ex) { + // ignore it and try again + } + if (logfiles[i].getLen() <= 0) { + // If the file could be empty, skip it. + break; + } + continue; + } + } + if (!recovered) { + continue; + } HLogKey key = new HLogKey(); HLogEdit val = new HLogEdit(); try { @@ -815,18 +851,21 @@ } catch (IOException e) { LOG.warn("Close in finally threw exception -- continuing", e); } - // Delete the input file now so we do not replay edits. We could - // have gotten here because of an exception. If so, probably - // nothing we can do about it. Replaying it, it could work but we - // could be stuck replaying for ever. Just continue though we - // could have lost some edits. - fs.delete(logfiles[i].getPath(), true); } } catch (IOException e) { - if (possiblyEmpty) { + e = RemoteExceptionHandler.checkIOException(e); + if (e instanceof EOFException) { + // No recoverable data in file. Skip it. continue; } - throw e; + + } finally { + // Delete the input file now so we do not replay edits. We could + // have gotten here because of an exception. If so, probably + // nothing we can do about it. Replaying it, it could work but we + // could be stuck replaying for ever. Just continue though we + // could have lost some edits. + fs.delete(logfiles[i].getPath(), true); } } } finally {