From f826b49c191293a7beb653d40da8f005efa4b8bc Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 27 Jun 2016 15:02:57 +0800 Subject: [PATCH] HBASE-16110 AsyncFS WAL doesn't work with Hadoop 2.8+ --- .../io/asyncfs/FanOutOneBlockAsyncDFSOutput.java | 4 +- .../FanOutOneBlockAsyncDFSOutputHelper.java | 201 ++++++++++++++++++--- .../FanOutOneBlockAsyncDFSOutputSaslHelper.java | 12 +- 3 files changed, 191 insertions(+), 26 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 8dd7f5e..9aab924 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.asyncfs; import static io.netty.handler.timeout.IdleState.READER_IDLE; import static io.netty.handler.timeout.IdleState.WRITER_IDLE; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO; +import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; @@ -71,7 +72,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.util.DataChecksum; /** @@ -339,7 +339,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { this.alloc = alloc; this.buf = alloc.directBuffer(); this.state = State.STREAMING; - setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT)); + setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 2e88ff2..b27f614 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -99,15 +99,15 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProt import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto.Builder; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -128,8 +128,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { // copied from DFSPacket since it is package private. public static final long HEART_BEAT_SEQNO = -1L; - // helper class for creating DataChecksum object. - private static final Method CREATE_CHECKSUM; + // Timeouts for communicating with DataNode for streaming writes/reads + public static final int READ_TIMEOUT = 60 * 1000; + public static final int READ_TIMEOUT_EXTENSION = 5 * 1000; + public static final int WRITE_TIMEOUT = 8 * 60 * 1000; // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may @@ -161,6 +163,17 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static final FileCreater FILE_CREATER; + // helper class for calling add block method on namenode. There is a addBlockFlags parameter for + // hadoop 2.8 or later. See createBlockAdder for more details. + private interface BlockAdder { + + LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, + ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes) + throws IOException; + } + + private static final BlockAdder BLOCK_ADDER; + // helper class for add or remove lease from DFSClient. Hadoop 2.4 use src as the Map's key, and // hadoop 2.5 or after use inodeId. See createLeaseManager for more details. private interface LeaseManager { @@ -181,6 +194,23 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR; + // helper class for convert protos. + private interface PBHelper { + + ExtendedBlockProto convert(final ExtendedBlock b); + + TokenProto convert(Token tok); + } + + private static final PBHelper PB_HELPER; + + // helper class for creating data checksum. + private interface ChecksumCreater { + DataChecksum createChecksum(Object conf); + } + + private static final ChecksumCreater CHECKSUM_CREATER; + private static DFSClientAdaptor createDFSClientAdaptor() { try { final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); @@ -415,6 +445,143 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { throw new Error("No create method found for " + ClientProtocol.class.getName()); } + private static BlockAdder createBlockAdder() { + for (Method method : ClientProtocol.class.getMethods()) { + if (method.getName().equals("addBlock")) { + final Method addBlockMethod = method; + Class[] paramTypes = addBlockMethod.getParameterTypes(); + if (paramTypes[paramTypes.length - 1] == String[].class) { + return new BlockAdder() { + + @Override + public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, + ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, + String[] favoredNodes) throws IOException { + try { + return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous, + excludeNodes, fileId, favoredNodes); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + Throwables.propagateIfPossible(e.getTargetException(), IOException.class); + throw new RuntimeException(e); + } + } + }; + } else { + return new BlockAdder() { + + @Override + public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, + ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, + String[] favoredNodes) throws IOException { + try { + return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous, + excludeNodes, fileId, favoredNodes, null); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + Throwables.propagateIfPossible(e.getTargetException(), IOException.class); + throw new RuntimeException(e); + } + } + }; + } + } + } + throw new Error("No addBlock method found for " + ClientProtocol.class.getName()); + } + + private static PBHelper createPBHelper() { + Class helperClass; + try { + helperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient"); + } catch (ClassNotFoundException e) { + LOG.warn("No PBHelperClient class found, should be hadoop 2.7-"); + helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class; + } + try { + final Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class); + final Method convertTokenMethod = helperClass.getMethod("convert", Token.class); + return new PBHelper() { + + @Override + public ExtendedBlockProto convert(ExtendedBlock b) { + try { + return (ExtendedBlockProto) convertEBMethod.invoke(null, b); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public TokenProto convert(Token tok) { + try { + return (TokenProto) convertTokenMethod.invoke(null, tok); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + }; + } catch (NoSuchMethodException e) { + throw new Error(e); + } + } + + private static ChecksumCreater createChecksumCreater28(Class confClass) { + for (Method method: confClass.getMethods()) { + if (method.getName().equals("createChecksum")) { + final Method createChecksumMethod = method; + return new ChecksumCreater() { + + @Override + public DataChecksum createChecksum(Object conf) { + try { + return (DataChecksum) createChecksumMethod.invoke(conf, (Object) null); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + }; + } + } + throw new Error("No createChecksum method found"); + } + + private static ChecksumCreater createChecksumCreater27(Class confClass) { + final Method createChecksumMethod; + try { + createChecksumMethod = confClass.getDeclaredMethod("createChecksum"); + } catch (NoSuchMethodException e) { + throw new Error(e); + } + createChecksumMethod.setAccessible(true); + return new ChecksumCreater() { + + @Override + public DataChecksum createChecksum(Object conf) { + try { + return (DataChecksum) createChecksumMethod.invoke(conf); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + }; + } + + private static ChecksumCreater createChecksumCreater() { + try { + return createChecksumCreater28(Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf")); + } catch (ClassNotFoundException e) { + LOG.warn("No DfsClientConf class found, should be hadoop 2.7-"); + } + try { + return createChecksumCreater27(Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf")); + } catch (ClassNotFoundException e) { + throw new Error(e); + } + } + // cancel the processing if DFSClient is already closed. static final class CancelOnClose implements CancelableProgressable { @@ -431,18 +598,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { } static { - try { - CREATE_CHECKSUM = DFSClient.Conf.class.getDeclaredMethod("createChecksum"); - CREATE_CHECKSUM.setAccessible(true); - } catch (NoSuchMethodException e) { - throw new Error(e); - } - PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter(); STORAGE_TYPE_SETTER = createStorageTypeSetter(); FILE_CREATER = createFileCreater(); + BLOCK_ADDER = createBlockAdder(); LEASE_MANAGER = createLeaseManager(); DFS_CLIENT_ADAPTOR = createDFSClientAdaptor(); + PB_HELPER = createPBHelper(); + CHECKSUM_CREATER = createChecksumCreater(); } static void beginFileLease(DFSClient client, String src, long inodeId) { @@ -454,11 +617,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { } static DataChecksum createChecksum(DFSClient client) { - try { - return (DataChecksum) CREATE_CHECKSUM.invoke(client.getConf()); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } + return CHECKSUM_CREATER.createChecksum(client.getConf()); } static Status getStatus(PipelineAckProto ack) { @@ -568,15 +727,15 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); final int timeoutMs = - conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT); + conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT); ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); blockCopy.setNumBytes(locatedBlock.getBlockSize()); ClientOperationHeaderProto header = ClientOperationHeaderProto .newBuilder() .setBaseHeader( - BaseHeaderProto.newBuilder().setBlock(PBHelper.convert(blockCopy)) - .setToken(PBHelper.convert(locatedBlock.getBlockToken()))) + BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy)) + .setToken(PB_HELPER.convert(locatedBlock.getBlockToken()))) .setClientName(clientName).build(); ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer); final OpWriteBlockProto.Builder writeBlockProtoBuilder = @@ -663,8 +822,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { List> futureList = null; try { DataChecksum summer = createChecksum(client); - locatedBlock = - namenode.addBlock(src, client.getClientName(), null, null, stat.getFileId(), null); + locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null, null, + stat.getFileId(), null); List datanodeList = new ArrayList<>(); futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, PIPELINE_SETUP_CREATE, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java index 33e8841..01891b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java @@ -86,7 +86,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.Builder; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.security.SaslPropertiesResolver; @@ -451,9 +450,16 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey"); final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv"); - final Method convertCipherOptionsMethod = PBHelper.class.getMethod("convertCipherOptions", + Class pbHelperClass; + try { + pbHelperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient"); + } catch (ClassNotFoundException e) { + LOG.warn("No PBHelperClient class found, should be hadoop 2.7-"); + pbHelperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class; + } + final Method convertCipherOptionsMethod = pbHelperClass.getMethod("convertCipherOptions", List.class); - final Method convertCipherOptionProtosMethod = PBHelper.class + final Method convertCipherOptionProtosMethod = pbHelperClass .getMethod("convertCipherOptionProtos", List.class); final Method addAllCipherOptionMethod = DataTransferEncryptorMessageProto.Builder.class .getMethod("addAllCipherOption", Iterable.class); -- 1.9.1