From 28ddd7baa2c43635e6170fb099acd8139e68942e Mon Sep 17 00:00:00 2001 From: Mike Drob Date: Mon, 26 Jun 2017 11:29:34 -0500 Subject: [PATCH] HBASE-18177 ClientProtocol::create has API changes between Hadoop 2/3 --- .../FanOutOneBlockAsyncDFSOutputHelper.java | 62 +++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 3eaacc42f7..2967556b16 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -197,6 +197,32 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static final ChecksumCreater CHECKSUM_CREATER; + // helper class for creating files. + private interface FileCreator { + default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked, + String clientName, EnumSetWritable flag, + boolean createParent, short replication, long blockSize, + CryptoProtocolVersion[] supportedVersions) throws Exception { + try { + return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent, + replication, blockSize, supportedVersions); + } catch (InvocationTargetException e) { + if (e.getCause() instanceof Exception) { + throw (Exception) e.getCause(); + } else { + throw new RuntimeException(e.getCause()); + } + } + }; + + Object createObject(ClientProtocol instance, String src, FsPermission masked, + String clientName, EnumSetWritable flag, + boolean createParent, short replication, long blockSize, + CryptoProtocolVersion[] supportedVersions) throws Exception; + } + + private static final FileCreator FILE_CREATOR; + private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException { Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); isClientRunningMethod.setAccessible(true); @@ -462,6 +488,39 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { return createChecksumCreater27(Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf")); } + private static FileCreator createFileCreator3() throws NoSuchMethodException { + Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, + String.class, EnumSetWritable.class, boolean.class, short.class, long.class, CryptoProtocolVersion[].class, + String.class); + + return (instance, src, masked, clientName, flag, createParent, replication, blockSize, + supportedVersions) -> { + return (HdfsFileStatus) createMethod.invoke(instance, + src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions, + null); + }; + } + + private static FileCreator createFileCreator2() throws NoSuchMethodException { + Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, + String.class, EnumSetWritable.class, boolean.class, short.class, long.class, CryptoProtocolVersion[].class); + + return (instance, src, masked, clientName, flag, createParent, replication, blockSize, + supportedVersions) -> { + return (HdfsFileStatus) createMethod.invoke(instance, + src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions); + }; + } + + private static FileCreator createFileCreator() throws NoSuchMethodException { + try { + return createFileCreator3(); + } catch (NoSuchMethodException e) { + LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x"); + } + return createFileCreator2(); + } + // cancel the processing if DFSClient is already closed. static final class CancelOnClose implements CancelableProgressable { @@ -486,6 +545,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { DFS_CLIENT_ADAPTOR = createDFSClientAdaptor(); PB_HELPER = createPBHelper(); CHECKSUM_CREATER = createChecksumCreater(); + FILE_CREATOR = createFileCreator(); } catch (Exception e) { String msg = "Couldn't properly initialize access to HDFS internals. Please " + "update your WAL Provider to not make use of the 'asyncfs' provider. See " @@ -680,7 +740,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { ClientProtocol namenode = client.getNamenode(); HdfsFileStatus stat; try { - stat = namenode.create(src, + stat = FILE_CREATOR.create(namenode, src, FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)), createParent, replication, blockSize, CryptoProtocolVersion.supported()); -- 2.13.0