Index: C:/workspace/hbase-sync/conf/hbase-default.xml
===================================================================
--- C:/workspace/hbase-sync/conf/hbase-default.xml (revision 747917)
+++ C:/workspace/hbase-sync/conf/hbase-default.xml (working copy)
@@ -182,13 +182,18 @@
hbase.regionserver.hlog.blocksize
- 1048576
- Block size for HLog files. To minimize potential data loss,
- the size should be (avg key length) * (avg value length) * flushlogentries.
- Default 1MB.
+ 67108864
+ Block size for HLog files. Default 64MB.
+ hbase.regionserver.hlog.leaserecoveryperiod
+ 10000
+ Amount of time to wait between HLog lease recovery attempts.
+ Default 10 seconds
+
+
+
hbase.regionserver.thread.splitcompactcheckfrequency
20000
How often a region server runs the split/compaction check.
Index: C:/workspace/hbase-sync/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
===================================================================
--- C:/workspace/hbase-sync/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (revision 747917)
+++ C:/workspace/hbase-sync/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (working copy)
@@ -24,6 +24,8 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.Collections;
import java.util.Map;
import java.util.SortedMap;
@@ -35,6 +37,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -91,6 +94,8 @@
public class HLog implements HConstants, Syncable {
private static final Log LOG = LogFactory.getLog(HLog.class);
private static final String HLOG_DATFILE = "hlog.dat.";
+ private static final SimpleDateFormat DATE_FORMAT =
+ new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
static final byte [] METACOLUMN = Bytes.toBytes("METACOLUMN:");
static final byte [] METAROW = Bytes.toBytes("METAROW");
final FileSystem fs;
@@ -146,7 +151,7 @@
* Keep the number of logs tidy.
*/
private final int maxLogs;
-
+
/**
* Create an edit log at the given dir location.
*
@@ -173,7 +178,7 @@
this.flushlogentries =
conf.getInt("hbase.regionserver.flushlogentries", 100);
this.blocksize =
- conf.getLong("hbase.regionserver.hlog.blocksize", 1024L * 1024L);
+ conf.getLong("hbase.regionserver.hlog.blocksize", 64L * 1024L * 1024L);
this.optionalFlushInterval =
conf.getLong("hbase.regionserver.optionallogflushinterval", 10 * 1000);
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
@@ -492,9 +497,14 @@
}
}
+ // This is public only because it implements a method in Syncable.
public void sync() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sync-ing " + unflushedEntries + ". Last flush time was: " +
+ DATE_FORMAT.format(new Date(lastLogFlushTime)));
+ }
lastLogFlushTime = System.currentTimeMillis();
- this.writer.sync();
+ this.writer.syncFs();
unflushedEntries = 0;
}
@@ -737,16 +747,42 @@
throws IOException {
Map logWriters =
new TreeMap(Bytes.BYTES_COMPARATOR);
+
+ long leaseRecoveryPeriod =
+ conf.getLong("hbase.regionserver.hlog.leaserecoveryperiod", 10000);
+
try {
for (int i = 0; i < logfiles.length; i++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Splitting " + (i + 1) + " of " + logfiles.length + ": " +
logfiles[i].getPath());
}
- // Check for possibly empty file. With appends, currently Hadoop reports
- // a zero length even if the file has been sync'd. Revisit if
- // HADOOP-4751 is committed.
- boolean possiblyEmpty = logfiles[i].getLen() <= 0;
+ // Recover the files lease if necessary
+ boolean recovered = false;
+ while (!recovered) {
+ try {
+ FSDataOutputStream out = fs.append(logfiles[i].getPath());
+ out.close();
+ recovered = true;
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Triggering lease recovery.");
+ }
+ try {
+ Thread.sleep(leaseRecoveryPeriod);
+ } catch (InterruptedException ex) {
+ // ignore it and try again
+ }
+ if (logfiles[i].getLen() <= 0) {
+ // If the file could be empty, skip it.
+ break;
+ }
+ continue;
+ }
+ }
+ if (!recovered) {
+ continue;
+ }
HLogKey key = new HLogKey();
HLogEdit val = new HLogEdit();
try {
@@ -815,18 +851,21 @@
} catch (IOException e) {
LOG.warn("Close in finally threw exception -- continuing", e);
}
- // Delete the input file now so we do not replay edits. We could
- // have gotten here because of an exception. If so, probably
- // nothing we can do about it. Replaying it, it could work but we
- // could be stuck replaying for ever. Just continue though we
- // could have lost some edits.
- fs.delete(logfiles[i].getPath(), true);
}
} catch (IOException e) {
- if (possiblyEmpty) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ if (e instanceof EOFException) {
+ // No recoverable data in file. Skip it.
continue;
}
- throw e;
+
+ } finally {
+ // Delete the input file now so we do not replay edits. We could
+ // have gotten here because of an exception. If so, probably
+ // nothing we can do about it. Replaying it, it could work but we
+ // could be stuck replaying for ever. Just continue though we
+ // could have lost some edits.
+ fs.delete(logfiles[i].getPath(), true);
}
}
} finally {