Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (revision 1345032) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (working copy) @@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; +import java.lang.reflect.Array; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.TreeMap; import org.apache.commons.logging.Log; @@ -32,7 +34,9 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.CompressionType; @@ -40,6 +44,7 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; + /** * Implementation of {@link HLog.Writer} that delegates to * SequenceFile.Writer. @@ -62,9 +67,22 @@ // This is the FSDataOutputStream instance that is the 'out' instance // in the SequenceFile.Writer 'writer' instance above. private FSDataOutputStream writer_out; + private boolean durableSync; + private boolean parallelWrites; + private FSDataOutputStream stream = null; private Class keyClass; + private static Class optionClz = null; + private static Method hsync; + static { + try { + hsync = SequenceFile.Writer.class.getMethod("hsync"); + optionClz = Class.forName("org.apache.hadoop.io.SequenceFile$Writer$Option"); + } catch (Exception e) { + } + } + /** * Context used by our wal dictionary compressor. Null if we're not to do * our custom dictionary compression. This custom WAL compression is distinct @@ -139,44 +157,89 @@ throw new IOException("Failed to initiate CompressionContext", e); } } + durableSync = conf.getBoolean(HConstants.ENABLE_WAL_DURABLE_SYNC, false); + parallelWrites = conf.getBoolean(HConstants.ENABLE_WAL_PARALLEL_WRITES, false); if (null == keyClass) { keyClass = HLog.getKeyClass(conf); } - // Create a SF.Writer instance. - try { - // reflection for a version of SequenceFile.createWriter that doesn't - // automatically create the parent directory (see HBASE-2312) - this.writer = (SequenceFile.Writer) SequenceFile.class - .getMethod("createWriter", new Class[] {FileSystem.class, - Configuration.class, Path.class, Class.class, Class.class, - Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE, - CompressionType.class, CompressionCodec.class, Metadata.class}) - .invoke(null, new Object[] {fs, conf, path, HLog.getKeyClass(conf), - WALEdit.class, - Integer.valueOf(fs.getConf().getInt("io.file.buffer.size", 4096)), - Short.valueOf((short) - conf.getInt("hbase.regionserver.hlog.replication", - fs.getDefaultReplication())), - Long.valueOf(conf.getLong("hbase.regionserver.hlog.blocksize", - fs.getDefaultBlockSize())), - Boolean.valueOf(false) /*createParent*/, - SequenceFile.CompressionType.NONE, new DefaultCodec(), - createMetadata(conf, compress) - }); - } catch (InvocationTargetException ite) { - // function was properly called, but threw it's own exception - throw new IOException(ite.getCause()); - } catch (Exception e) { - // ignore all other exceptions. related to reflection failure + // reflection for a version of SequenceFile.createWriter that doesn't + // automatically create the parent directory (see HBASE-2312) + if (optionClz != null && (durableSync || parallelWrites)) { + try { + stream = FSUtils.create(fs, path, FsPermission.getDefault(), false, true, durableSync, + parallelWrites); + Object optionArray = Array.newInstance(optionClz, 5); + Array.set(optionArray, 0, + SequenceFile.Writer.class + .getMethod("stream", new Class[] { FSDataOutputStream.class }) + .invoke(null, new Object[] { stream })); + Array.set(optionArray, 1, + SequenceFile.Writer.class + .getMethod("keyClass", new Class[] { Class.class }) + .invoke(null, new Object[] { keyClass })); + Array.set(optionArray, 2, + SequenceFile.Writer.class + .getMethod("valueClass", new Class[] { Class.class }) + .invoke(null, new Object[] { WALEdit.class })); + Array.set(optionArray, 3, + SequenceFile.Writer.class + .getMethod("compression", new Class[] { CompressionType.class, CompressionCodec.class }) + .invoke(null, new Object[] { SequenceFile.CompressionType.NONE, new DefaultCodec() })); + Array.set(optionArray, 4, + SequenceFile.Writer.class + .getMethod("metadata", new Class[] { Metadata.class }) + .invoke(null, new Object[] { createMetadata(conf, compress) })); + this.writer = (SequenceFile.Writer) SequenceFile.class + .getMethod("createWriter", new Class[] { Configuration.class, optionArray.getClass() }) + .invoke(null, new Object[] { conf, optionArray }); + } catch (InvocationTargetException ite) { + // function was properly called, but threw it's own exception + throw new IOException(ite.getCause()); + } catch (Exception e) { + // If reflection failed once, no need to try again + optionClz = null; + } } + if (this.writer == null) { + if (durableSync) { + LOG.info("Durable sync requested but -- HDFS-744 -- not available"); + } + try { + // reflection for a version of SequenceFile.createWriter that doesn't + // automatically create the parent directory (see HBASE-2312) + this.writer = (SequenceFile.Writer) SequenceFile.class + .getMethod("createWriter", new Class[] {FileSystem.class, + Configuration.class, Path.class, Class.class, Class.class, + Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE, + CompressionType.class, CompressionCodec.class, Metadata.class}) + .invoke(null, new Object[] {fs, conf, path, keyClass, + WALEdit.class, + Integer.valueOf(fs.getConf().getInt("io.file.buffer.size", 4096)), + Short.valueOf((short) + conf.getInt("hbase.regionserver.hlog.replication", + fs.getDefaultReplication())), + Long.valueOf(conf.getLong("hbase.regionserver.hlog.blocksize", + fs.getDefaultBlockSize())), + Boolean.valueOf(false) /*createParent*/, + SequenceFile.CompressionType.NONE, new DefaultCodec(), + createMetadata(conf, compress) + }); + } catch (InvocationTargetException ite) { + // function was properly called, but threw it's own exception + throw new IOException(ite.getCause()); + } catch (Exception e) { + // ignore all other exceptions. related to reflection failure + } + } + // if reflection failed, use the old createWriter if (this.writer == null) { LOG.debug("new createWriter -- HADOOP-6840 -- not available"); this.writer = SequenceFile.createWriter(fs, conf, path, - HLog.getKeyClass(conf), WALEdit.class, + keyClass, WALEdit.class, fs.getConf().getInt("io.file.buffer.size", 4096), (short) conf.getInt("hbase.regionserver.hlog.replication", fs.getDefaultReplication()), @@ -241,10 +304,29 @@ } this.writer = null; } + if (this.stream != null) { + try { + this.stream.close(); + } catch (NullPointerException npe) { + LOG.warn(npe); + } + this.stream = null; + } } @Override public void sync() throws IOException { + if (durableSync && hsync != null) { + try { + hsync.invoke(this.writer); + return; + } catch (InvocationTargetException ite) { + throw new IOException(ite); + } catch (Exception e) { + // on reflection exception, don't try again + hsync = null; + } + } this.writer.syncFs(); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java (revision 1345032) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java (working copy) @@ -266,6 +266,7 @@ FileSystem fs, Path path) throws IOException { FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); - return FSUtils.create(fs, path, perms); + boolean durableSync = conf.getBoolean(HConstants.ENABLE_HFILE_DURABLE_SYNC, false); + return FSUtils.create(fs, path, perms, true, true, durableSync, false); } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (revision 1345032) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (working copy) @@ -25,9 +25,11 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -51,6 +53,7 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.FSProtos; @@ -58,6 +61,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -74,7 +78,15 @@ /** Full access permissions (starting point for a umask) */ private static final String FULL_RWX_PERMISSIONS = "777"; - + private static Object[] createFlags = null; + static { + try { + createFlags = Class.forName("org.apache.hadoop.fs.CreateFlag").getEnumConstants(); + if (createFlags.length != 5) + createFlags = null; + } catch (Exception e) { + } + } protected FSUtils() { super(); } @@ -168,6 +180,74 @@ } /** + * Create the specified file on the filesystem. By default, this will: + *
    + *
  1. apply the umask in the configuration (if it is enabled)
  2. + *
  3. use the fs configured buffer size (or {@value DEFAULT_BUFFER_SIZE} if + * not set)
  4. + *
  5. use the default replication
  6. + *
  7. use the default block size
  8. + *
  9. not track progress
  10. + *
+ * + * @param fs {@link FileSystem} on which to write the file + * @param path {@link Path} to the file to write + * @param perm + * @param createParent Whether to create the parent directory if it does not exist. + * @param overwrite Whether or not the created file should be overwritten. + * @param syncBlocks Open the file in FORCE mode to support durable sync (fsync). + * @return output stream to the created file + * @throws IOException if the file cannot be created + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static FSDataOutputStream create(FileSystem fs, Path path, FsPermission perm, + boolean createParent, boolean overwrite, boolean syncBlocks, boolean parallelWrites) + throws IOException { + LOG.debug("Creating file:" + path + "with permission:" + perm); + + if (createFlags != null && (syncBlocks || createParent || parallelWrites)) { + try { + EnumSet flags = EnumSet.of((Enum) createFlags[0]); // CREATE + if (overwrite) flags.add(createFlags[1]); // OVERWRITE + if (syncBlocks) flags.add(createFlags[3]); // SYNC_BLOCK + if (parallelWrites) flags.add(createFlags[4]); // PARALLEL_WRITE + + return (FSDataOutputStream) FileSystem.class + .getMethod( + createParent? "create" : "createNonRecursive", + new Class[] { Path.class, FsPermission.class, EnumSet.class, + Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class }) + .invoke( + // use the backingFs for HFileSystem + fs instanceof HFileSystem ? ((HFileSystem)fs).getBackingFs() : fs, + new Object[] { + path, + perm, + flags, + fs.getConf().getInt("io.file.buffer.size", 4096), + (short) fs.getConf().getInt("hbase.regionserver.hlog.replication", + fs.getDefaultReplication()), + fs.getConf().getLong("hbase.regionserver.hlog.blocksize", + fs.getDefaultBlockSize()), + null }); + } catch (InvocationTargetException ite) { + // function was properly called, but threw it's own exception + throw new IOException(ite.getCause()); + } catch (Exception e) { + // on reflection errors, only try once + createFlags = null; + } + } + if (syncBlocks || createParent || parallelWrites) { + LOG.debug("FSUtils.create -- HDFS-744/HDFS-1783 -- not available"); + } + + return fs.create(path, perm, overwrite, + fs.getConf().getInt("io.file.buffer.size", 4096), + fs.getDefaultReplication(), fs.getDefaultBlockSize(), null); + } + + /** * Get the file permissions specified in the configuration, if they are * enabled. * Index: hbase-server/pom.xml =================================================================== --- hbase-server/pom.xml (revision 1345032) +++ hbase-server/pom.xml (working copy) @@ -586,13 +586,10 @@ 3.0 - - 3.0-SNAPSHOT - org.apache.hadoop - hadoop-common + hadoop-client org.apache.hadoop Index: pom.xml =================================================================== --- pom.xml (revision 1345032) +++ pom.xml (working copy) @@ -1223,10 +1223,11 @@ 1.6.1 3.0.0-SNAPSHOT + org.apache.hadoop - hadoop-common + hadoop-client ${hadoop.version} @@ -1243,6 +1244,7 @@ compile +