From 9daeb68a41d8622fa476ed657eab2d67f6b2b082 Mon Sep 17 00:00:00 2001 From: Alex Leblang Date: Fri, 22 Dec 2017 13:40:33 -0600 Subject: [PATCH] HBASE-19369 Switch to Builder Pattern In WAL This patch switches to the builder pattern by adding a helper method. It also checks to ensure that the pattern is available (i.e. that HBase is running on a hadoop version that supports it). Amending-Author: Mike Drob --- .../apache/hadoop/hbase/util/CommonFSUtils.java | 134 +++++++++++++++++++++ .../procedure2/store/wal/WALProcedureStore.java | 2 +- .../hbase/io/asyncfs/AsyncFSOutputHelper.java | 7 +- .../hbase/regionserver/wal/ProtobufLogWriter.java | 22 ++-- .../hbase/regionserver/wal/TestHBaseOnEC.java | 80 ++++++++++++ 5 files changed, 231 insertions(+), 14 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseOnEC.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java index 5a6625bd4d..fb3794d11e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java @@ -806,6 +806,140 @@ public abstract class CommonFSUtils { conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); } + private static class DfsBuilderUtility { + static Class dfsClass = null; + static Method createMethod; + static Method overwriteMethod; + static Method bufferSizeMethod; + static Method blockSizeMethod; + static Method recursiveMethod; + static Method replicateMethod; + static Method replicationMethod; + static Method buildMethod; + static boolean allMethodsPresent = false; + + static { + String dfsClassName = "org.apache.hadoop.hdfs.DistributedFileSystem"; + String builderClassName = dfsClassName + ".HdfsDataOutputStreamBuilder"; + Class builderClass = null; + + try { + dfsClass = Class.forName(dfsClassName); + } catch (ClassNotFoundException e) { + LOG.info("{} not available, will not use builder API for file creation.", dfsClassName); + } + try { + builderClass = Class.forName(builderClassName); + } catch (ClassNotFoundException e) { + LOG.info("{} not available, will not use builder API for file creation.", builderClassName); + } + + if (dfsClass != null && builderClass != null) { + try { + createMethod = dfsClass.getMethod("createFile", Path.class); + overwriteMethod = builderClass.getMethod("overwrite", boolean.class); + bufferSizeMethod = builderClass.getMethod("bufferSize", int.class); + blockSizeMethod = builderClass.getMethod("blockSize", long.class); + recursiveMethod = builderClass.getMethod("recursive"); + replicateMethod = builderClass.getMethod("replicate"); + replicationMethod = builderClass.getMethod("replication", short.class); + buildMethod = builderClass.getMethod("build"); + + allMethodsPresent = true; + } catch (NoSuchMethodException e) { + LOG.info("Could not find method on builder; will use old DFS API for file creation: {}", + e.getMessage()); + } + } + } + + /** + * Attempt to use builder API via reflection to create a file with the given parameters and + * replication enabled. + */ + public static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable, + int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { + FSDataOutputStream output = null; + + if (allMethodsPresent && dfsClass.isInstance(fs)) { + try { + Object builder; + + builder = createMethod.invoke(fs, path); + builder = overwriteMethod.invoke(builder, overwritable); + builder = bufferSizeMethod.invoke(builder, bufferSize); + builder = blockSizeMethod.invoke(builder, blockSize); + if (isRecursive) { + builder = recursiveMethod.invoke(builder); + } + builder = replicateMethod.invoke(builder); + builder = replicationMethod.invoke(builder, replication); + output = (FSDataOutputStream) buildMethod.invoke(builder); + } catch (IllegalAccessException | InvocationTargetException e) { + // Couldn't reflect to build an output stream, probably on an old version of hadoop + } + } + + if (output == null) { + if (isRecursive) { + output = fs.create(path, overwritable, bufferSize, replication, blockSize, null); + } else { + output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, + null); + } + } + + return output; + } + + /** + * Attempt to use builder API via reflection to create a file with the given parameters and + * replication enabled. + */ + public static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable) + throws IOException { + FSDataOutputStream output = null; + + if (allMethodsPresent && dfsClass.isInstance(fs)) { + try { + Object builder; + + builder = createMethod.invoke(fs, path); + builder = overwriteMethod.invoke(builder, overwritable); + builder = replicateMethod.invoke(builder); + output = (FSDataOutputStream) buildMethod.invoke(builder); + } catch (IllegalAccessException | InvocationTargetException e) { + // Couldn't reflect to build an output stream, probably on an old version of hadoop + } + } + + if (output == null) { + output = fs.create(path, overwritable); + } + + return output; + } + } + + /** + * Attempt to use builder API via reflection to create a file with the given parameters and + * replication enabled. + */ + public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable) + throws IOException { + return DfsBuilderUtility.createHelper(fs, path, overwritable); + } + + /** + * Attempt to use builder API via reflection to create a file with the given parameters and + * replication enabled. + */ + public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable, + int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException { + return DfsBuilderUtility.createHelper(fs, path, overwritable, bufferSize, replication, + blockSize, isRecursive); + } + // Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and // not until we attempt to reference it. private static class StreamCapabilities { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 521ae7e8ca..b6543c3800 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -1020,7 +1020,7 @@ public class WALProcedureStore extends ProcedureStoreBase { long startPos = -1; newLogFile = getLogFilePath(logId); try { - newStream = fs.create(newLogFile, false); + newStream = CommonFSUtils.createForWal(fs, newLogFile, false); } catch (FileAlreadyExistsException e) { LOG.error("Log file with id=" + logId + " already exists", e); return false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index 43ddfb06cb..325da7c198 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -54,11 +54,8 @@ public final class AsyncFSOutputHelper { final FSDataOutputStream out; int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); - if (createParent) { - out = fs.create(f, overwrite, bufferSize, replication, blockSize, null); - } else { - out = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null); - } + out = CommonFSUtils.createForWal(fs, f, overwrite, bufferSize, replication, blockSize, + createParent); // After we create the stream but before we attempt to use it at all // ensure that we can provide the level of data safety we're configured // to provide. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index aeb2c19c25..8ab594f0f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -21,19 +21,21 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; import java.io.OutputStream; + import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; +import org.apache.hadoop.hbase.wal.FSHLogProvider; +import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; -import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; -import org.apache.hadoop.hbase.wal.FSHLogProvider; -import org.apache.hadoop.hbase.wal.WAL.Entry; /** * Writer for protobuf-based WAL. @@ -62,7 +64,9 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter public void close() throws IOException { if (this.output != null) { try { - if (!trailerWritten) writeWALTrailer(); + if (!trailerWritten) { + writeWALTrailer(); + } this.output.close(); } catch (NullPointerException npe) { // Can get a NPE coming up from down in DFSClient$DFSOutputStream#close @@ -75,7 +79,9 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter @Override public void sync() throws IOException { FSDataOutputStream fsdos = this.output; - if (fsdos == null) return; // Presume closed + if (fsdos == null) { + return; // Presume closed + } fsdos.flush(); fsdos.hflush(); } @@ -88,8 +94,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, short replication, long blockSize) throws IOException, StreamLacksCapabilityException { - this.output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, - null); + this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication, + blockSize, false); // TODO Be sure to add a check for hsync if this branch includes HBASE-19024 if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true) && !(CommonFSUtils.hasCapability(output, "hflush"))) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseOnEC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseOnEC.java new file mode 100644 index 0000000000..9d18e2279c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseOnEC.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertArrayEquals; + +import java.io.IOException; +import java.lang.reflect.Method; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestHBaseOnEC { + private static final HBaseTestingUtility util = new HBaseTestingUtility(); + + final byte[] row = Bytes.toBytes("row"); + final byte[] cf = Bytes.toBytes("cf"); + final byte[] cq = Bytes.toBytes("cq"); + final byte[] value = Bytes.toBytes("value"); + + @BeforeClass + public static void setup() throws Exception { + util.startMiniCluster(); + MiniDFSCluster cluster = util.getDFSCluster(); + + try { + Method m = DFSTestUtil.class.getMethod("enableAllECPolicies", DistributedFileSystem.class); + m.invoke(null, cluster.getFileSystem()); + } catch (NoSuchMethodException e) { + // We are on an older version of hadoop, don't set the policies + } + } + + @AfterClass + public static void tearDown() throws Exception { + util.shutdownMiniCluster(); + } + + @Test + public void testBasic() throws IOException { + TableName name = TableName.valueOf(getClass().getSimpleName()); + + Table t = util.createTable(name, cf); + t.put(new Put(row).addColumn(cf, cq, value)); + + util.getAdmin().flush(name); + + assertArrayEquals(value, t.get(new Get(row)).getValue(cf, cq)); + } +} + -- 2.15.0