Index: src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java (revision 1336873) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java (working copy) @@ -266,6 +266,6 @@ FileSystem fs, Path path) throws IOException { FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); - return FSUtils.create(fs, path, perms); + return FSUtils.create(fs, path, perms, true, true); } } Index: src/main/java/org/apache/hadoop/hbase/util/FSUtils.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (revision 1336873) +++ src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (working copy) @@ -23,9 +23,11 @@ import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -51,6 +53,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -158,6 +161,66 @@ } /** + * Create the specified file on the filesystem. By default, this will: + *
    + *
  1. apply the umask in the configuration (if it is enabled)
  2. + *
  3. use the fs configured buffer size (or {@value DEFAULT_BUFFER_SIZE} if + * not set)
  4. + *
  5. use the default replication
  6. + *
  7. use the default block size
  8. + *
  9. not track progress
  10. + *
+ * + * @param fs {@link FileSystem} on which to write the file + * @param path {@link Path} to the file to write + * @param perm + * @param overwrite Whether or not the created file should be overwritten. + * @return output stream to the created file + * @throws IOException if the file cannot be created + */ + @SuppressWarnings("unchecked") + public static FSDataOutputStream create(FileSystem fs, Path path, + FsPermission perm, boolean overwrite, boolean force) throws IOException { + LOG.debug("Creating file:" + path + "with permission:" + perm); + + try { + Class createFlagsClz = Class.forName("org.apache.hadoop.fs.CreateFlag"); + Object[] enums = createFlagsClz.getEnumConstants(); + EnumSet flags = EnumSet.of((Enum)enums[0]); // CREATE + if (overwrite) flags.add(enums[1]); // OVERWRITE + if (force) flags.add(enums[3]); // FORCE + + return (FSDataOutputStream) FileSystem.class + .getMethod( + "create", + new Class[] { Path.class, FsPermission.class, EnumSet.class, + Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class }) + .invoke( + fs, + new Object[] { + path, + perm, + flags, + fs.getConf().getInt("io.file.buffer.size", 4096), + (short) fs.getConf().getInt("hbase.regionserver.hlog.replication", + fs.getDefaultReplication()), + fs.getConf().getLong("hbase.regionserver.hlog.blocksize", + fs.getDefaultBlockSize()), + null }); + } catch (InvocationTargetException ite) { + // function was properly called, but threw it's own exception + throw new IOException(ite.getCause()); + } catch (Exception e) { + // ignore all other exceptions. related to reflection failure + } + LOG.debug("new createWriter -- HDFS-744 -- not available"); + + return fs.create(path, perm, overwrite, + fs.getConf().getInt("io.file.buffer.size", 4096), + fs.getDefaultReplication(), fs.getDefaultBlockSize(), null); + } + + /** * Get the file permissions specified in the configuration, if they are * enabled. * Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (revision 1336873) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (working copy) @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; +import java.lang.reflect.Array; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.util.TreeMap; @@ -32,7 +33,9 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.CompressionType; @@ -40,6 +43,7 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; + /** * Implementation of {@link HLog.Writer} that delegates to * SequenceFile.Writer. @@ -64,7 +68,16 @@ private FSDataOutputStream writer_out; private Class keyClass; + private static Class optionClz = null; + static { + try { + // This class exists in hadoop 0.23+ + optionClz = Class.forName("org.apache.hadoop.io.SequenceFile$Writer$Option"); + } catch (Exception e) { + } + } + /** * Context used by our wal dictionary compressor. Null if we're not to do * our custom dictionary compression. This custom WAL compression is distinct @@ -106,7 +119,10 @@ } return new Metadata(metaMap); } - +enum c { + a, + b; +} /** * Call this method after init() has been executed * @@ -144,34 +160,73 @@ keyClass = HLog.getKeyClass(conf); } - // Create a SF.Writer instance. - try { - // reflection for a version of SequenceFile.createWriter that doesn't - // automatically create the parent directory (see HBASE-2312) - this.writer = (SequenceFile.Writer) SequenceFile.class - .getMethod("createWriter", new Class[] {FileSystem.class, - Configuration.class, Path.class, Class.class, Class.class, - Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE, - CompressionType.class, CompressionCodec.class, Metadata.class}) - .invoke(null, new Object[] {fs, conf, path, HLog.getKeyClass(conf), - WALEdit.class, - Integer.valueOf(fs.getConf().getInt("io.file.buffer.size", 4096)), - Short.valueOf((short) - conf.getInt("hbase.regionserver.hlog.replication", - fs.getDefaultReplication())), - Long.valueOf(conf.getLong("hbase.regionserver.hlog.blocksize", - fs.getDefaultBlockSize())), - Boolean.valueOf(false) /*createParent*/, - SequenceFile.CompressionType.NONE, new DefaultCodec(), - createMetadata(conf, compress) - }); - } catch (InvocationTargetException ite) { - // function was properly called, but threw it's own exception - throw new IOException(ite.getCause()); - } catch (Exception e) { - // ignore all other exceptions. related to reflection failure + // reflection for a version of SequenceFile.createWriter that doesn't + // automatically create the parent directory (see HBASE-2312) + if (optionClz != null) { + try { + FSDataOutputStream out = FSUtils.create(fs, path, FsPermission.getDefault(), true, true); + Object optionArray = Array.newInstance(optionClz, 5); + Array.set(optionArray, 0, + SequenceFile.Writer.class + .getMethod("stream", new Class[] { FSDataOutputStream.class }) + .invoke(null, new Object[] { out })); + Array.set(optionArray, 1, + SequenceFile.Writer.class + .getMethod("keyClass", new Class[] { Class.class }) + .invoke(null, new Object[] { HLog.getKeyClass(conf) })); + Array.set(optionArray, 2, + SequenceFile.Writer.class + .getMethod("valueClass", new Class[] { Class.class }) + .invoke(null, new Object[] { WALEdit.class })); + Array.set(optionArray, 3, + SequenceFile.Writer.class + .getMethod("compression", new Class[] { CompressionType.class, CompressionCodec.class }) + .invoke(null, new Object[] { SequenceFile.CompressionType.NONE, new DefaultCodec() })); + Array.set(optionArray, 4, + SequenceFile.Writer.class + .getMethod("metadata", new Class[] { Metadata.class }) + .invoke(null, new Object[] { createMetadata(conf, compress) })); + this.writer = (SequenceFile.Writer) SequenceFile.class + .getMethod("createWriter", new Class[] { Configuration.class, optionArray.getClass() }) + .invoke(null, new Object[] { conf, optionArray }); + } catch (InvocationTargetException ite) { + // function was properly called, but threw it's own exception + throw new IOException(ite.getCause()); + } catch (Exception e) { + // ignore all other exceptions. related to reflection failure + } } + if (this.writer == null) { + LOG.debug("new createWriter -- HDFS-744 -- not available"); + try { + // reflection for a version of SequenceFile.createWriter that doesn't + // automatically create the parent directory (see HBASE-2312) + this.writer = (SequenceFile.Writer) SequenceFile.class + .getMethod("createWriter", new Class[] {FileSystem.class, + Configuration.class, Path.class, Class.class, Class.class, + Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE, + CompressionType.class, CompressionCodec.class, Metadata.class}) + .invoke(null, new Object[] {fs, conf, path, HLog.getKeyClass(conf), + WALEdit.class, + Integer.valueOf(fs.getConf().getInt("io.file.buffer.size", 4096)), + Short.valueOf((short) + conf.getInt("hbase.regionserver.hlog.replication", + fs.getDefaultReplication())), + Long.valueOf(conf.getLong("hbase.regionserver.hlog.blocksize", + fs.getDefaultBlockSize())), + Boolean.valueOf(false) /*createParent*/, + SequenceFile.CompressionType.NONE, new DefaultCodec(), + createMetadata(conf, compress) + }); + } catch (InvocationTargetException ite) { + // function was properly called, but threw it's own exception + throw new IOException(ite.getCause()); + } catch (Exception e) { + // ignore all other exceptions. related to reflection failure + } + } + // if reflection failed, use the old createWriter if (this.writer == null) { LOG.debug("new createWriter -- HADOOP-6840 -- not available"); Index: pom.xml =================================================================== --- pom.xml (revision 1336873) +++ pom.xml (working copy) @@ -2022,6 +2022,87 @@ + + + hadoop-3.0.0 + + + hadoop.profile + 30 + + + + 3.0.0-SNAPSHOT + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-annotations + ${hadoop.version} + + + + org.apache.hadoop + hadoop-minicluster + ${hadoop.version} + compile + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-test-resource + + add-test-resource + + + + + src/test/resources + + hbase-site.xml + + + + + + + + + maven-dependency-plugin + + + create-mrapp-generated-classpath + generate-test-resources + + build-classpath + + + + ${project.build.directory}/test-classes/mrapp-generated-classpath + + + + + + + +