Index: src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java (revision 1336873)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java (working copy)
@@ -266,6 +266,6 @@
FileSystem fs, Path path) throws IOException {
FsPermission perms = FSUtils.getFilePermissions(fs, conf,
HConstants.DATA_FILE_UMASK_KEY);
- return FSUtils.create(fs, path, perms);
+ return FSUtils.create(fs, path, perms, true, true);
}
}
Index: src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (revision 1336873)
+++ src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (working copy)
@@ -23,9 +23,11 @@
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -51,6 +53,7 @@
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@@ -158,6 +161,66 @@
}
/**
+ * Create the specified file on the filesystem. By default, this will:
+ *
+ * - apply the umask in the configuration (if it is enabled)
+ * - use the fs configured buffer size (or {@value DEFAULT_BUFFER_SIZE} if
+ * not set)
+ * - use the default replication
+ * - use the default block size
+ * - not track progress
+ *
+ *
+ * @param fs {@link FileSystem} on which to write the file
+ * @param path {@link Path} to the file to write
+ * @param perm
+ * @param overwrite Whether or not the created file should be overwritten.
+ * @return output stream to the created file
+ * @throws IOException if the file cannot be created
+ */
+ @SuppressWarnings("unchecked")
+ public static FSDataOutputStream create(FileSystem fs, Path path,
+ FsPermission perm, boolean overwrite, boolean force) throws IOException {
+ LOG.debug("Creating file:" + path + "with permission:" + perm);
+
+ try {
+ Class> createFlagsClz = Class.forName("org.apache.hadoop.fs.CreateFlag");
+ Object[] enums = createFlagsClz.getEnumConstants();
+ EnumSet flags = EnumSet.of((Enum)enums[0]); // CREATE
+ if (overwrite) flags.add(enums[1]); // OVERWRITE
+ if (force) flags.add(enums[3]); // FORCE
+
+ return (FSDataOutputStream) FileSystem.class
+ .getMethod(
+ "create",
+ new Class[] { Path.class, FsPermission.class, EnumSet.class,
+ Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class })
+ .invoke(
+ fs,
+ new Object[] {
+ path,
+ perm,
+ flags,
+ fs.getConf().getInt("io.file.buffer.size", 4096),
+ (short) fs.getConf().getInt("hbase.regionserver.hlog.replication",
+ fs.getDefaultReplication()),
+ fs.getConf().getLong("hbase.regionserver.hlog.blocksize",
+ fs.getDefaultBlockSize()),
+ null });
+ } catch (InvocationTargetException ite) {
+ // function was properly called, but threw it's own exception
+ throw new IOException(ite.getCause());
+ } catch (Exception e) {
+ // ignore all other exceptions. related to reflection failure
+ }
+ LOG.debug("new createWriter -- HDFS-744 -- not available");
+
+ return fs.create(path, perm, overwrite,
+ fs.getConf().getInt("io.file.buffer.size", 4096),
+ fs.getDefaultReplication(), fs.getDefaultBlockSize(), null);
+ }
+
+ /**
* Get the file permissions specified in the configuration, if they are
* enabled.
*
Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (revision 1336873)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (working copy)
@@ -21,6 +21,7 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
+import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.util.TreeMap;
@@ -32,7 +33,9 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
@@ -40,6 +43,7 @@
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
+
/**
* Implementation of {@link HLog.Writer} that delegates to
* SequenceFile.Writer.
@@ -64,7 +68,16 @@
private FSDataOutputStream writer_out;
private Class extends HLogKey> keyClass;
+ private static Class> optionClz = null;
+ static {
+ try {
+ // This class exists in hadoop 0.23+
+ optionClz = Class.forName("org.apache.hadoop.io.SequenceFile$Writer$Option");
+ } catch (Exception e) {
+ }
+ }
+
/**
* Context used by our wal dictionary compressor. Null if we're not to do
* our custom dictionary compression. This custom WAL compression is distinct
@@ -106,7 +119,10 @@
}
return new Metadata(metaMap);
}
-
+enum c {
+ a,
+ b;
+}
/**
* Call this method after init() has been executed
*
@@ -144,34 +160,73 @@
keyClass = HLog.getKeyClass(conf);
}
- // Create a SF.Writer instance.
- try {
- // reflection for a version of SequenceFile.createWriter that doesn't
- // automatically create the parent directory (see HBASE-2312)
- this.writer = (SequenceFile.Writer) SequenceFile.class
- .getMethod("createWriter", new Class[] {FileSystem.class,
- Configuration.class, Path.class, Class.class, Class.class,
- Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE,
- CompressionType.class, CompressionCodec.class, Metadata.class})
- .invoke(null, new Object[] {fs, conf, path, HLog.getKeyClass(conf),
- WALEdit.class,
- Integer.valueOf(fs.getConf().getInt("io.file.buffer.size", 4096)),
- Short.valueOf((short)
- conf.getInt("hbase.regionserver.hlog.replication",
- fs.getDefaultReplication())),
- Long.valueOf(conf.getLong("hbase.regionserver.hlog.blocksize",
- fs.getDefaultBlockSize())),
- Boolean.valueOf(false) /*createParent*/,
- SequenceFile.CompressionType.NONE, new DefaultCodec(),
- createMetadata(conf, compress)
- });
- } catch (InvocationTargetException ite) {
- // function was properly called, but threw it's own exception
- throw new IOException(ite.getCause());
- } catch (Exception e) {
- // ignore all other exceptions. related to reflection failure
+ // reflection for a version of SequenceFile.createWriter that doesn't
+ // automatically create the parent directory (see HBASE-2312)
+ if (optionClz != null) {
+ try {
+ FSDataOutputStream out = FSUtils.create(fs, path, FsPermission.getDefault(), true, true);
+ Object optionArray = Array.newInstance(optionClz, 5);
+ Array.set(optionArray, 0,
+ SequenceFile.Writer.class
+ .getMethod("stream", new Class[] { FSDataOutputStream.class })
+ .invoke(null, new Object[] { out }));
+ Array.set(optionArray, 1,
+ SequenceFile.Writer.class
+ .getMethod("keyClass", new Class[] { Class.class })
+ .invoke(null, new Object[] { HLog.getKeyClass(conf) }));
+ Array.set(optionArray, 2,
+ SequenceFile.Writer.class
+ .getMethod("valueClass", new Class[] { Class.class })
+ .invoke(null, new Object[] { WALEdit.class }));
+ Array.set(optionArray, 3,
+ SequenceFile.Writer.class
+ .getMethod("compression", new Class[] { CompressionType.class, CompressionCodec.class })
+ .invoke(null, new Object[] { SequenceFile.CompressionType.NONE, new DefaultCodec() }));
+ Array.set(optionArray, 4,
+ SequenceFile.Writer.class
+ .getMethod("metadata", new Class[] { Metadata.class })
+ .invoke(null, new Object[] { createMetadata(conf, compress) }));
+ this.writer = (SequenceFile.Writer) SequenceFile.class
+ .getMethod("createWriter", new Class[] { Configuration.class, optionArray.getClass() })
+ .invoke(null, new Object[] { conf, optionArray });
+ } catch (InvocationTargetException ite) {
+ // function was properly called, but threw it's own exception
+ throw new IOException(ite.getCause());
+ } catch (Exception e) {
+ // ignore all other exceptions. related to reflection failure
+ }
}
+ if (this.writer == null) {
+ LOG.debug("new createWriter -- HDFS-744 -- not available");
+ try {
+ // reflection for a version of SequenceFile.createWriter that doesn't
+ // automatically create the parent directory (see HBASE-2312)
+ this.writer = (SequenceFile.Writer) SequenceFile.class
+ .getMethod("createWriter", new Class[] {FileSystem.class,
+ Configuration.class, Path.class, Class.class, Class.class,
+ Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE,
+ CompressionType.class, CompressionCodec.class, Metadata.class})
+ .invoke(null, new Object[] {fs, conf, path, HLog.getKeyClass(conf),
+ WALEdit.class,
+ Integer.valueOf(fs.getConf().getInt("io.file.buffer.size", 4096)),
+ Short.valueOf((short)
+ conf.getInt("hbase.regionserver.hlog.replication",
+ fs.getDefaultReplication())),
+ Long.valueOf(conf.getLong("hbase.regionserver.hlog.blocksize",
+ fs.getDefaultBlockSize())),
+ Boolean.valueOf(false) /*createParent*/,
+ SequenceFile.CompressionType.NONE, new DefaultCodec(),
+ createMetadata(conf, compress)
+ });
+ } catch (InvocationTargetException ite) {
+ // function was properly called, but threw it's own exception
+ throw new IOException(ite.getCause());
+ } catch (Exception e) {
+ // ignore all other exceptions. related to reflection failure
+ }
+ }
+
// if reflection failed, use the old createWriter
if (this.writer == null) {
LOG.debug("new createWriter -- HADOOP-6840 -- not available");
Index: pom.xml
===================================================================
--- pom.xml (revision 1336873)
+++ pom.xml (working copy)
@@ -2022,6 +2022,87 @@
+
+
+ hadoop-3.0.0
+
+
+ hadoop.profile
+ 30
+
+
+
+ 3.0.0-SNAPSHOT
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+
+
+ org.apache.hadoop
+ hadoop-annotations
+ ${hadoop.version}
+
+
+
+ org.apache.hadoop
+ hadoop-minicluster
+ ${hadoop.version}
+ compile
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+
+
+ add-test-resource
+
+ add-test-resource
+
+
+
+
+ src/test/resources
+
+ hbase-site.xml
+
+
+
+
+
+
+
+
+ maven-dependency-plugin
+
+
+ create-mrapp-generated-classpath
+ generate-test-resources
+
+ build-classpath
+
+
+
+ ${project.build.directory}/test-classes/mrapp-generated-classpath
+
+
+
+
+
+
+
+