From d6e25b2da1ea967b4972f2bef53498a0617f5829 Mon Sep 17 00:00:00 2001 From: Alex Leblang Date: Fri, 22 Dec 2017 13:40:33 -0600 Subject: [PATCH] HBASE-19369 Switch to Builder Pattern In WAL This patch switches to the builder pattern by adding a helper method. It also checks to ensure that the pattern is available (i.e. that HBase is running on a hadoop version that supports it). Signed-off-by: Mike Drob --- .../hbase/io/asyncfs/AsyncFSOutputHelper.java | 7 +- .../hbase/regionserver/wal/ProtobufLogWriter.java | 4 +- .../java/org/apache/hadoop/hbase/util/FSUtils.java | 52 ++++++++++++++ .../hbase/regionserver/wal/TestHBaseOnEC.java | 81 ++++++++++++++++++++++ 4 files changed, 137 insertions(+), 7 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseOnEC.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index 43ddfb06cb..609ac6e54f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.yetus.audience.InterfaceAudience; @@ -54,11 +55,7 @@ public final class AsyncFSOutputHelper { final FSDataOutputStream out; int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); - if (createParent) { - out = fs.create(f, overwrite, bufferSize, replication, blockSize, null); - } else { - out = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null); - } + out = FSUtils.createHelper(fs, f, overwrite, bufferSize, replication, blockSize, createParent); // After we create the stream but before we attempt to use it at all // ensure that we can provide the level of data safety we're configured // to provide. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index aeb2c19c25..fed7db7935 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -88,8 +89,7 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, short replication, long blockSize) throws IOException, StreamLacksCapabilityException { - this.output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, - null); + this.output = FSUtils.createHelper(fs, path, overwritable, bufferSize, replication, blockSize, false); // TODO Be sure to add a check for hsync if this branch includes HBASE-19024 if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true) && !(CommonFSUtils.hasCapability(output, "hflush"))) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 1620fd8bee..b5ae18349f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -841,6 +841,58 @@ public abstract class FSUtils extends CommonFSUtils { return frags; } + /** + * Attempt to use builder API via reflection to create a file with the given parameters and replication enabled. + */ + public static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable, int bufferSize, + short replication, long blockSize, boolean isRecursive) throws IOException { + FSDataOutputStream output = null; + + if (fs instanceof DistributedFileSystem) { + try { + Method createMethod = DistributedFileSystem.class.getMethod("createFile", Path.class); + + Object builder = createMethod.invoke(fs, path); + + Method overwriteMethod = builder.getClass().getMethod("overwrite", boolean.class); + builder = overwriteMethod.invoke(builder, overwritable); + + Method bufferSizeMethod = builder.getClass().getMethod("bufferSize", int.class); + builder = bufferSizeMethod.invoke(builder, bufferSize); + + Method blockSizeMethod = builder.getClass().getMethod("blockSize", long.class); + builder = blockSizeMethod.invoke(builder, blockSize); + + Method replicateMethod = builder.getClass().getMethod("replicate"); + builder = replicateMethod.invoke(builder); + + // Set the proper recursive flag if applicable + if (isRecursive) { + Method recursiveMethod = builder.getClass().getMethod("recursive"); + builder = recursiveMethod.invoke(builder); + } + + Method replicationMethod = builder.getClass().getMethod("replication", short.class); + builder = replicationMethod.invoke(builder, replication); + + Method buildMethod = builder.getClass().getMethod("build"); + output = (FSDataOutputStream) buildMethod.invoke(builder); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + // Couldn't reflect to build an output stream, probably on an old version of hadoop + } + } + + if (output == null) { + if (isRecursive) { + output = fs.create(path, overwritable, bufferSize, replication, blockSize, null); + } else { + output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null); + } + } + + return output; + } + /** * A {@link PathFilter} that returns only regular files. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseOnEC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseOnEC.java new file mode 100644 index 0000000000..48a4ab18ce --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseOnEC.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.lang.reflect.Method; + +// @Category LargeTest +public class TestHBaseOnEC { + private static final HBaseTestingUtility util = new HBaseTestingUtility(); + + final byte[] row = Bytes.toBytes("row"); + final byte[] cf = Bytes.toBytes("cf"); + final byte[] cq = Bytes.toBytes("cq"); + final byte[] value = Bytes.toBytes("value"); + + @BeforeClass + public static void setup() throws Exception { + util.startMiniCluster(); + MiniDFSCluster cluster = util.getDFSCluster(); + + try { + Method m = DFSTestUtil.class.getMethod("enableAllECPolicies", DistributedFileSystem.class); + m.invoke(null, cluster.getFileSystem()); + } catch (NoSuchMethodException e) { + // We are on an older version of hadoop, don't set the policies + } + } + + @AfterClass + public static void tearDown() throws Exception { + util.shutdownMiniCluster(); + } + + @Test + public void testBasic() throws IOException { + TableName name = TableName.valueOf(getClass().getSimpleName()); + + Table t = util.createTable(name, cf); + t.put(new Put(row).addColumn(cf, cq, value)); + + util.getAdmin().flush(name); + + assertArrayEquals(value, t.get(new Get(row)).getValue(cf, cq)); + } +} + -- 2.15.0