Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (revision 1340619) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (working copy) @@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; +import java.lang.reflect.Array; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.TreeMap; import org.apache.commons.logging.Log; @@ -32,7 +34,9 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.CompressionType; @@ -40,6 +44,7 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; + /** * Implementation of {@link HLog.Writer} that delegates to * SequenceFile.Writer. @@ -62,9 +67,21 @@ // This is the FSDataOutputStream instance that is the 'out' instance // in the SequenceFile.Writer 'writer' instance above. private FSDataOutputStream writer_out; + private boolean durableSync; + private FSDataOutputStream stream = null; private Class extends HLogKey> keyClass; + private static Class> optionClz = null; + private static Method hsync; + static { + try { + hsync = SequenceFile.Writer.class.getMethod("hsync"); + optionClz = Class.forName("org.apache.hadoop.io.SequenceFile$Writer$Option"); + } catch (Exception e) { + } + } + /** * Context used by our wal dictionary compressor. Null if we're not to do * our custom dictionary compression. This custom WAL compression is distinct @@ -139,44 +156,87 @@ throw new IOException("Failed to initiate CompressionContext", e); } } + durableSync = conf.getBoolean(HConstants.ENABLE_WAL_DURABLE_SYNC, false); if (null == keyClass) { keyClass = HLog.getKeyClass(conf); } - // Create a SF.Writer instance. - try { - // reflection for a version of SequenceFile.createWriter that doesn't - // automatically create the parent directory (see HBASE-2312) - this.writer = (SequenceFile.Writer) SequenceFile.class - .getMethod("createWriter", new Class[] {FileSystem.class, - Configuration.class, Path.class, Class.class, Class.class, - Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE, - CompressionType.class, CompressionCodec.class, Metadata.class}) - .invoke(null, new Object[] {fs, conf, path, HLog.getKeyClass(conf), - WALEdit.class, - Integer.valueOf(fs.getConf().getInt("io.file.buffer.size", 4096)), - Short.valueOf((short) - conf.getInt("hbase.regionserver.hlog.replication", - fs.getDefaultReplication())), - Long.valueOf(conf.getLong("hbase.regionserver.hlog.blocksize", - fs.getDefaultBlockSize())), - Boolean.valueOf(false) /*createParent*/, - SequenceFile.CompressionType.NONE, new DefaultCodec(), - createMetadata(conf, compress) - }); - } catch (InvocationTargetException ite) { - // function was properly called, but threw it's own exception - throw new IOException(ite.getCause()); - } catch (Exception e) { - // ignore all other exceptions. related to reflection failure + // reflection for a version of SequenceFile.createWriter that doesn't + // automatically create the parent directory (see HBASE-2312) + if (optionClz != null && durableSync) { + try { + stream = FSUtils.create(fs, path, FsPermission.getDefault(), false, true, true); + Object optionArray = Array.newInstance(optionClz, 5); + Array.set(optionArray, 0, + SequenceFile.Writer.class + .getMethod("stream", new Class[] { FSDataOutputStream.class }) + .invoke(null, new Object[] { stream })); + Array.set(optionArray, 1, + SequenceFile.Writer.class + .getMethod("keyClass", new Class[] { Class.class }) + .invoke(null, new Object[] { keyClass })); + Array.set(optionArray, 2, + SequenceFile.Writer.class + .getMethod("valueClass", new Class[] { Class.class }) + .invoke(null, new Object[] { WALEdit.class })); + Array.set(optionArray, 3, + SequenceFile.Writer.class + .getMethod("compression", new Class[] { CompressionType.class, CompressionCodec.class }) + .invoke(null, new Object[] { SequenceFile.CompressionType.NONE, new DefaultCodec() })); + Array.set(optionArray, 4, + SequenceFile.Writer.class + .getMethod("metadata", new Class[] { Metadata.class }) + .invoke(null, new Object[] { createMetadata(conf, compress) })); + this.writer = (SequenceFile.Writer) SequenceFile.class + .getMethod("createWriter", new Class[] { Configuration.class, optionArray.getClass() }) + .invoke(null, new Object[] { conf, optionArray }); + } catch (InvocationTargetException ite) { + // function was properly called, but threw it's own exception + throw new IOException(ite.getCause()); + } catch (Exception e) { + // If reflection failed once, no need to try again + optionClz = null; + } } + if (this.writer == null) { + if (durableSync) { + LOG.info("Durable sync requested but -- HDFS-744 -- not available"); + } + try { + // reflection for a version of SequenceFile.createWriter that doesn't + // automatically create the parent directory (see HBASE-2312) + this.writer = (SequenceFile.Writer) SequenceFile.class + .getMethod("createWriter", new Class[] {FileSystem.class, + Configuration.class, Path.class, Class.class, Class.class, + Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE, + CompressionType.class, CompressionCodec.class, Metadata.class}) + .invoke(null, new Object[] {fs, conf, path, keyClass, + WALEdit.class, + Integer.valueOf(fs.getConf().getInt("io.file.buffer.size", 4096)), + Short.valueOf((short) + conf.getInt("hbase.regionserver.hlog.replication", + fs.getDefaultReplication())), + Long.valueOf(conf.getLong("hbase.regionserver.hlog.blocksize", + fs.getDefaultBlockSize())), + Boolean.valueOf(false) /*createParent*/, + SequenceFile.CompressionType.NONE, new DefaultCodec(), + createMetadata(conf, compress) + }); + } catch (InvocationTargetException ite) { + // function was properly called, but threw it's own exception + throw new IOException(ite.getCause()); + } catch (Exception e) { + // ignore all other exceptions. related to reflection failure + } + } + // if reflection failed, use the old createWriter if (this.writer == null) { LOG.debug("new createWriter -- HADOOP-6840 -- not available"); this.writer = SequenceFile.createWriter(fs, conf, path, - HLog.getKeyClass(conf), WALEdit.class, + keyClass, WALEdit.class, fs.getConf().getInt("io.file.buffer.size", 4096), (short) conf.getInt("hbase.regionserver.hlog.replication", fs.getDefaultReplication()), @@ -241,11 +301,31 @@ } this.writer = null; } + if (this.stream != null) { + try { + this.stream.close(); + } catch (NullPointerException npe) { + LOG.warn(npe); + } + this.stream = null; + } } @Override public void sync() throws IOException { - this.writer.syncFs(); + if (durableSync || hsync == null) { + this.writer.syncFs(); + } else { + try { + hsync.invoke(this.writer); + } catch (InvocationTargetException ite) { + throw new IOException(ite); + } catch (Exception e) { + // on reflection exception, don't try again + hsync = null; + this.writer.syncFs(); + } + } } @Override Index: src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1340619) +++ src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -662,6 +662,12 @@ public static final String ENABLE_WAL_COMPRESSION = "hbase.regionserver.wal.enablecompression"; + public static final String ENABLE_WAL_DURABLE_SYNC = + "hbase.regionserver.wal.durable.sync"; + + public static final String ENABLE_HFILE_DURABLE_SYNC = + "hbase.regionserver.hfile.durable.sync"; + /** Region in Transition metrics threshold time */ public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD="hbase.metrics.rit.stuck.warning.threshold"; Index: src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java (revision 1340619) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java (working copy) @@ -266,6 +266,7 @@ FileSystem fs, Path path) throws IOException { FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); - return FSUtils.create(fs, path, perms); + boolean durableSync = conf.getBoolean(HConstants.ENABLE_HFILE_DURABLE_SYNC, false); + return FSUtils.create(fs, path, perms, true, true, durableSync); } } Index: src/main/java/org/apache/hadoop/hbase/util/FSUtils.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (revision 1340619) +++ src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (working copy) @@ -25,9 +25,11 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -51,6 +53,7 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.FSProtos; @@ -58,6 +61,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -74,7 +78,15 @@ /** Full access permissions (starting point for a umask) */ private static final String FULL_RWX_PERMISSIONS = "777"; - + private static Object[] createFlags = null; + static { + try { + createFlags = Class.forName("org.apache.hadoop.fs.CreateFlag").getEnumConstants(); + if (createFlags.length != 4) + createFlags = null; + } catch (Exception e) { + } + } protected FSUtils() { super(); } @@ -168,6 +180,72 @@ } /** + * Create the specified file on the filesystem. By default, this will: + *