diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 0e5cf81..a00535d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -28,7 +28,9 @@ import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStag import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.CodedOutputStream; + import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.InvocationTargetException; @@ -76,6 +78,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; +import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; @@ -201,7 +204,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static final ChecksumCreater CHECKSUM_CREATER; // helper class for creating files. - private interface FileCreator { + @VisibleForTesting + interface FileCreator { default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked, String clientName, EnumSetWritable flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions) @@ -758,12 +762,21 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)), createParent, replication, blockSize, CryptoProtocolVersion.supported()); - } catch (Exception e) { - if (e instanceof RemoteException) { - throw (RemoteException) e; - } else { - throw new NameNodeException(e); + } catch (RemoteException re) { + IOException e = re.unwrapRemoteException(); + if (e instanceof RetryStartFileException) { + if (retry < createMaxRetries) { + LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); + retry++; + continue; + } else { + throw new IOException( + "Too many retries because of encryption zone operations.", e); + } } + throw re; + } catch (Exception e) { + throw new NameNodeException(e); } beginFileLease(client, stat.getFileId()); boolean succ = false; @@ -791,15 +804,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC); succ = true; return output; - } catch (RemoteException e) { - LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); - if (shouldRetryCreate(e)) { - if (retry >= createMaxRetries) { - throw e.unwrapRemoteException(); - } - } else { - throw e.unwrapRemoteException(); - } } catch (IOException e) { LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e); if (retry >= createMaxRetries) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 6be44e9..c9cdfbb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.asyncfs; +import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertArrayEquals; @@ -29,12 +30,14 @@ import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; + import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,8 +48,10 @@ import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException; import org.apache.hadoop.ipc.RemoteException; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; @@ -239,4 +244,63 @@ public class TestFanOutOneBlockAsyncDFSOutput { } assertArrayEquals(b, actual); } + + static int createRetryCounter = 0; + private static final int ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES_FOR_TEST = 3; + private static FanOutOneBlockAsyncDFSOutputHelper.FileCreator createFileCreatorForTest( + FanOutOneBlockAsyncDFSOutputHelper.FileCreator oldCreator + ) { + return (instance, src, masked, clientName, flag, createParent, replication, blockSize, + supportedVersions) -> { + if (++createRetryCounter < ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES_FOR_TEST) { + RetryStartFileException ex = new RetryStartFileException("injected for test"); + throw new RemoteException(ex.getClass().getName(), ex.getMessage()); + } + + // use the original FileCreator to send RPC to the NameNode. + // it should succeed this time + return oldCreator.createObject(instance, src, masked, clientName, flag, createParent, replication, blockSize, + supportedVersions); + }; + } + + @Test + public void testRetryStartFileException() + throws IOException, NoSuchFieldException, IllegalAccessException { + // use reflection to forcefully change static final variable for test + Field field = FanOutOneBlockAsyncDFSOutputHelper.class.getDeclaredField("FILE_CREATOR"); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + boolean isModifierAccessible = modifiersField.isAccessible(); + modifiersField.setAccessible(true); + modifiersField.setInt( field, field.getModifiers() & ~Modifier.FINAL ); + + boolean isAccessible = field.isAccessible(); + field.setAccessible(true); + + FanOutOneBlockAsyncDFSOutputHelper.FileCreator oldCreator = + (FanOutOneBlockAsyncDFSOutputHelper.FileCreator)field.get(null); + + // replace the original FileCreator + field.set(null, createFileCreatorForTest(oldCreator)); + + try { + FS.getConf().getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, + ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES_FOR_TEST); + + Path f = new Path("/" + name.getMethodName()); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper + .createOutput(FS, f, true, false, (short) 3, FS.getDefaultBlockSize(), + eventLoop, CHANNEL_CLASS); + + // verify that FileCreator was called 3 times + Assert.assertEquals("create retry attempts", + ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES_FOR_TEST, createRetryCounter); + } finally { + // restore the static final field + field.set(null, oldCreator); + field.setAccessible(isAccessible); + modifiersField.setAccessible(isModifierAccessible); + } + } }