From 829a6a9b1b2fa918da3b77c1b6f1fc653c5ad407 Mon Sep 17 00:00:00 2001 From: Alex Leblang Date: Wed, 29 Nov 2017 17:11:54 -0500 Subject: [PATCH] HBASE-19369 Switch to Builder Pattern In WAL This patch awitches to the builder pattern by adding a helper method. It also checks to endure that the pattern is available (i.e. that HBase is running on a hadoop version that supports it). --- .../apache/hadoop/hbase/util/CommonFSUtils.java | 2 +- .../hbase/io/asyncfs/AsyncFSOutputHelper.java | 5 +- .../hbase/regionserver/wal/ProtobufLogWriter.java | 4 +- .../java/org/apache/hadoop/hbase/util/FSUtils.java | 53 +++++++++++++++++ .../hbase/regionserver/wal/TestHBaseOnEC.java | 67 ++++++++++++++++++++++ 5 files changed, 126 insertions(+), 5 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseOnEC.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java index 2a83a5d..6693a2b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.Progressable; import org.apache.yetus.audience.InterfaceAudience; /** @@ -889,5 +890,4 @@ public abstract class CommonFSUtils { super(message); } } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index 43ddfb0..cc01ba6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.yetus.audience.InterfaceAudience; @@ -55,9 +56,9 @@ public final class AsyncFSOutputHelper { int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); if (createParent) { - out = fs.create(f, overwrite, bufferSize, replication, blockSize, null); + out = FSUtils.createHelper(fs, f, overwrite, bufferSize, replication, blockSize, true); } else { - out = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null); + out = FSUtils.createHelper(fs, f, overwrite, bufferSize, replication, blockSize, false); } // After we create the stream but before we attempt to use it at all // ensure that we can provide the level of data safety we're configured diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 7a135c9..a5cc8fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; @@ -90,8 +91,7 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, short replication, long blockSize) throws IOException, StreamLacksCapabilityException { - this.output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, - null); + this.output = FSUtils.createHelper(fs, path, overwritable, bufferSize, replication, blockSize, false); // TODO Be sure to add a check for hsync if this branch includes HBASE-19024 if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true) && !(CommonFSUtils.hasCapability(output, "hflush"))) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 81fcaf2..8dc4b30 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -841,6 +841,59 @@ public abstract class FSUtils extends CommonFSUtils { return frags; } + // TODO: implement createHelper method - will look mostly the same... + // Or possibly add another argument to the existing method + + public static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable, int bufferSize, + short replication, long blockSize, boolean isRecursive) throws IOException { + FSDataOutputStream output = null; + + if (fs instanceof DistributedFileSystem) { + try { + Method createMethod = DistributedFileSystem.class.getMethod("createFile", Path.class); + + Object builder = createMethod.invoke(fs, path); + + Method overwriteMethod = builder.getClass().getMethod("overwrite", boolean.class); + builder = overwriteMethod.invoke(builder, overwritable); + + Method bufferSizeMethod = builder.getClass().getMethod("bufferSize", int.class); + builder = bufferSizeMethod.invoke(builder, bufferSize); + + Method blockSizeMethod = builder.getClass().getMethod("blockSize", long.class); + builder = blockSizeMethod.invoke(builder, blockSize); + + Method replicateMethod = builder.getClass().getMethod("replicate"); + builder = replicateMethod.invoke(builder); + + // Set the proper recursive flag if applicable + if (isRecursive){ + Method recursiveMethod = builder.getClass().getMethod("recursive"); + builder = recursiveMethod.invoke(builder); + } + + Method replicationMethod = builder.getClass().getMethod("replication", short.class); + builder = replicationMethod.invoke(builder, replication); + + Method buildMethod = builder.getClass().getMethod("build"); + output = (FSDataOutputStream) buildMethod.invoke(builder); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + // Couldn't reflect to build an output stream, probably on an old version of hadoop + } + } + + if (output == null) { + if (isRecursive) { + output = fs.create(path, overwritable, bufferSize, replication, blockSize, null); + } + else { + output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null); + } + } + + return output; + } + /** * A {@link PathFilter} that returns only regular files. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseOnEC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseOnEC.java new file mode 100644 index 0000000..ebce27c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseOnEC.java @@ -0,0 +1,67 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; + +// @Category LargeTest +public class TestHBaseOnEC { + static HBaseTestingUtility util = new HBaseTestingUtility(); + @BeforeClass + public void setup() { + Configuration conf = util.getConfiguration(); + util.startMiniCluster(); + MiniDFSCluster cluster = util.getDFSCluster(); + DFSTestUtil.enableAllECPolicies(cluster.getFileSystem()); + } + + @After + public void tearDown() { + util.shutdownMiniCluster(); + } + + @Test + public void testBasic() throws IOException { + String value = "value"; + + Table t = util.createTable(TableName.valueOf("TestHBaseOnEC"), "f"); + Put p = new Put("row".getBytes(), "columnFamily".getBytes(), + "qualifier".getBytes(), value.getBytes()); + t.put(p); + + byte[] retValue = t.get(new Get("row".getBytes())).getValue(); + assertEquals(value, Bytes.toString(retValue)); + } +} -- 2.7.4 (Apple Git-66)