From 1a6bb53835c992aecf3ee11aceb416581c85f52e Mon Sep 17 00:00:00 2001 From: "chance.li" Date: Sat, 25 Nov 2017 20:05:20 +0800 Subject: [PATCH] HBASE-19344 improve asyncWAL by using Independent thread for netty #IO in FanOutOneBlockAsyncDFSOutput --- .../hbase/io/asyncfs/AsyncFSOutputHelper.java | 5 ++- .../io/asyncfs/FanOutOneBlockAsyncDFSOutput.java | 7 ++-- .../FanOutOneBlockAsyncDFSOutputHelper.java | 14 ++++--- .../hadoop/hbase/regionserver/HRegionServer.java | 14 +++++-- .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 12 ++++-- .../regionserver/wal/AsyncProtobufLogWriter.java | 8 +++- .../wal/SecureAsyncProtobufLogWriter.java | 6 ++- .../hbase/util/NettyEventLoopGroupConfig.java | 18 +++++++++ .../hadoop/hbase/wal/AsyncFSWALProvider.java | 43 ++++++++++++++-------- .../hbase/wal/NettyAsyncFSWALConfigHelper.java | 2 +- .../asyncfs/TestFanOutOneBlockAsyncDFSOutput.java | 18 +++++---- .../hbase/io/asyncfs/TestLocalAsyncOutput.java | 4 +- .../TestSaslFanOutOneBlockAsyncDFSOutput.java | 4 +- .../hbase/regionserver/wal/TestAsyncFSWAL.java | 6 ++- .../regionserver/wal/TestAsyncProtobufLog.java | 4 +- .../hbase/regionserver/wal/TestAsyncWALReplay.java | 5 ++- 16 files changed, 119 insertions(+), 51 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index 1f5462f921..507c2f659b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.util.CancelableProgressable; @@ -57,11 +58,11 @@ public final class AsyncFSOutputHelper { */ public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoop eventLoop, - Class channelClass) + EventLoopGroup eventLoopGroup4Fanout, Class channelClass) throws IOException, CommonFSUtils.StreamLacksCapabilityException { if (fs instanceof DistributedFileSystem) { return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, - overwrite, createParent, replication, blockSize, eventLoop, channelClass); + overwrite, createParent, replication, blockSize, eventLoop, eventLoopGroup4Fanout, channelClass); } final FSDataOutputStream fsOut; int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 3daf15b299..67bc44c1c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.asyncfs; +import java.util.concurrent.ConcurrentLinkedDeque; import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.READER_IDLE; import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.WRITER_IDLE; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO; @@ -153,13 +154,13 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { this.unfinishedReplicas = Collections.emptySet(); } else { this.unfinishedReplicas = - Collections.newSetFromMap(new IdentityHashMap(replicas.size())); + Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap(replicas.size()))); this.unfinishedReplicas.addAll(replicas); } } } - private final Deque waitingAckQueue = new ArrayDeque<>(); + private final Deque waitingAckQueue = new ConcurrentLinkedDeque<>(); // this could be different from acked block length because a packet can not start at the middle of // a chunk. @@ -180,7 +181,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private State state; - private void completed(Channel channel) { + private synchronized void completed(Channel channel) { if (waitingAckQueue.isEmpty()) { return; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index d3dc957ec2..0c9c6541f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.asyncfs; import static org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; +import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.READER_IDLE; import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; @@ -666,7 +667,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static List> connectToDataNodes(Configuration conf, DFSClient client, String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, - BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop, + BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup4Fanout, Class channelClass) { Enum[] storageTypes = locatedBlock.getStorageTypes(); DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); @@ -690,6 +691,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { for (int i = 0; i < datanodeInfos.length; i++) { DatanodeInfo dnInfo = datanodeInfos[i]; Enum storageType = storageTypes[i]; + EventLoop eventLoop = eventLoopGroup4Fanout.next(); Promise promise = eventLoop.newPromise(); futureList.add(promise); String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); @@ -732,7 +734,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, boolean overwrite, boolean createParent, short replication, long blockSize, - EventLoop eventLoop, Class channelClass) throws IOException { + EventLoop eventLoop, EventLoopGroup eventLoopGroup4Fanout, + Class channelClass) throws IOException { Configuration conf = dfs.getConf(); FSUtils fsUtils = FSUtils.getInstance(dfs, conf); DFSClient client = dfs.getClient(); @@ -761,7 +764,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { stat.getFileId(), null); List datanodeList = new ArrayList<>(); futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, - PIPELINE_SETUP_CREATE, summer, eventLoop, channelClass); + PIPELINE_SETUP_CREATE, summer, eventLoopGroup4Fanout, channelClass); for (Future future : futureList) { // fail the creation if there are connection failures since we are fail-fast. The upper // layer should retry itself if needed. @@ -801,14 +804,15 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { */ public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, boolean overwrite, boolean createParent, short replication, long blockSize, - EventLoop eventLoop, Class channelClass) throws IOException { + EventLoop eventLoop, EventLoopGroup eventLoopGroup4Fanout, + Class channelClass) throws IOException { return new FileSystemLinkResolver() { @Override public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException, UnresolvedLinkException { return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, - blockSize, eventLoop, channelClass); + blockSize, eventLoop, eventLoopGroup4Fanout, channelClass); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 500f74fcd4..5084c509f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -138,6 +138,8 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; +import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Addressing; @@ -664,10 +666,16 @@ public class HRegionServer extends HasThread implements private static NettyEventLoopGroupConfig setupNetty(Configuration conf) { // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL. - NettyEventLoopGroupConfig nelgc = - new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup"); + NettyEventLoopGroupConfig nelgc = new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup"); NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); - NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); + + int workerCount = conf.getInt("hbase.wal.async.netty.worker.count", 1); + + Pair> eventLoopGroupAndChannelClass = + NettyEventLoopGroupConfig.getEventLoopGroup(conf, "AsyncFSWAL-Global-IO", workerCount); + NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, eventLoopGroupAndChannelClass.getFirst(), + eventLoopGroupAndChannelClass.getSecond()); + return nelgc; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index d4e113a42b..2773e48897 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -44,6 +44,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.ConnectionUtils; @@ -143,6 +144,7 @@ public class AsyncFSWAL extends AbstractFSWAL { public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10; private final EventLoop eventLoop; + private final EventLoopGroup eventLoopGroup4Fanout; private final Class channelClass; @@ -194,10 +196,11 @@ public class AsyncFSWAL extends AbstractFSWAL { public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, - String prefix, String suffix, EventLoop eventLoop, Class channelClass) - throws FailedLogCloseException, IOException { + String prefix, String suffix, EventLoop eventLoop, EventLoopGroup eventLoopGroup4Fanout, + Class channelClass) throws FailedLogCloseException, IOException { super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); this.eventLoop = eventLoop; + this.eventLoopGroup4Fanout = eventLoopGroup4Fanout; this.channelClass = channelClass; Supplier hasConsumerTask; if (eventLoop instanceof SingleThreadEventExecutor) { @@ -616,8 +619,9 @@ public class AsyncFSWAL extends AbstractFSWAL { boolean overwrite = false; for (int retry = 0;; retry++) { try { - return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoop, - channelClass); + return AsyncFSWALProvider + .createAsyncWriter(conf, fs, path, overwrite, eventLoop, eventLoopGroup4Fanout, + channelClass); } catch (RemoteException e) { LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e); if (shouldRetryCreate(e)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index f3c5bf2617..e5ef3dcb15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.ByteBufferWriter; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; @@ -55,6 +56,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter private static final Log LOG = LogFactory.getLog(AsyncProtobufLogWriter.class); private final EventLoop eventLoop; + private final EventLoopGroup eventLoopGroup4Fanout; private final Class channelClass; @@ -103,8 +105,10 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter private OutputStream asyncOutputWrapper; - public AsyncProtobufLogWriter(EventLoop eventLoop, Class channelClass) { + public AsyncProtobufLogWriter(EventLoop eventLoop, EventLoopGroup eventLoopGroup4Fanout, + Class channelClass) { this.eventLoop = eventLoop; + this.eventLoopGroup4Fanout = eventLoopGroup4Fanout; this.channelClass = channelClass; } @@ -156,7 +160,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, short replication, long blockSize) throws IOException, StreamLacksCapabilityException { this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, - blockSize, eventLoop, channelClass); + blockSize, eventLoop, eventLoopGroup4Fanout, channelClass); this.asyncOutputWrapper = new OutputStreamWrapper(output); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java index a686a1b1e0..0d437657ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.crypto.Encryptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; @@ -33,8 +34,9 @@ public class SecureAsyncProtobufLogWriter extends AsyncProtobufLogWriter { private Encryptor encryptor = null; - public SecureAsyncProtobufLogWriter(EventLoop eventLoop, Class channelClass) { - super(eventLoop, channelClass); + public SecureAsyncProtobufLogWriter(EventLoop eventLoop, EventLoopGroup eventLoopGroup, + Class channelClass) { + super(eventLoop, eventLoopGroup, channelClass); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/NettyEventLoopGroupConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/NettyEventLoopGroupConfig.java index 1d8b17691d..71bc08fa6c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/NettyEventLoopGroupConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/NettyEventLoopGroupConfig.java @@ -68,6 +68,24 @@ public class NettyEventLoopGroupConfig { } } + public static Pair> getEventLoopGroup(Configuration conf, + String threadPoolName, int workerCount) { + boolean useEpoll = useEpoll(conf); + ThreadFactory eventLoopThreadFactory = + new DefaultThreadFactory(threadPoolName, true, Thread.MAX_PRIORITY); + EventLoopGroup group; + Class channelClass; + if (useEpoll) { + group = new EpollEventLoopGroup(workerCount, eventLoopThreadFactory); + channelClass = EpollSocketChannel.class; + } else { + group = new NioEventLoopGroup(workerCount, eventLoopThreadFactory); + channelClass = NioSocketChannel.class; + } + + return new Pair<>(group, channelClass); + } + public EventLoopGroup group() { return group; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java index 430413748a..9636e9276e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java @@ -17,14 +17,13 @@ */ package org.apache.hadoop.hbase.wal; +import java.util.concurrent.ThreadFactory; import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; +import org.apache.hadoop.hbase.shaded.io.netty.channel.DefaultEventLoop; import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop; import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; -import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel; -import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; @@ -33,7 +32,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.DefaultThreadFactory; +import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; @@ -62,6 +62,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { } private EventLoopGroup eventLoopGroup; + private EventLoop asyncWalEventLoop; private Class channelClass; @Override @@ -70,20 +71,30 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, - eventLoopGroup.next(), channelClass); + asyncWalEventLoop, eventLoopGroup, channelClass); } @Override protected void doInit(Configuration conf) throws IOException { - Pair> eventLoopGroupAndChannelClass = + + ThreadFactory threadFactory = + new DefaultThreadFactory("AsyncFSWAL-Handler", true, Thread.MAX_PRIORITY); + + asyncWalEventLoop = new DefaultEventLoop(threadFactory); + + Pair> eventLoopGroupAndChannelClassGloble = NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); - if (eventLoopGroupAndChannelClass != null) { - eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); - channelClass = eventLoopGroupAndChannelClass.getSecond(); + if (eventLoopGroupAndChannelClassGloble != null) { + eventLoopGroup = eventLoopGroupAndChannelClassGloble.getFirst(); + channelClass = eventLoopGroupAndChannelClassGloble.getSecond(); } else { - eventLoopGroup = new NioEventLoopGroup(1, - new DefaultThreadFactory("AsyncFSWAL", true, Thread.MAX_PRIORITY)); - channelClass = NioSocketChannel.class; + int workerCount = conf.getInt("hbase.wal.async.netty.worker.count", 1); + Pair> eventLoopGroupAndChannelClassPerWal = + NettyEventLoopGroupConfig.getEventLoopGroup(conf, "AsyncFSWAL-IO", workerCount); + + assert eventLoopGroupAndChannelClassPerWal != null; + eventLoopGroup = eventLoopGroupAndChannelClassPerWal.getFirst(); + channelClass = eventLoopGroupAndChannelClassPerWal.getSecond(); } } @@ -91,14 +102,16 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { * public because of AsyncFSWAL. Should be package-private */ public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, - boolean overwritable, EventLoop eventLoop, Class channelClass) + boolean overwritable, EventLoop eventLoop, EventLoopGroup eventLoopGroup4Fanout, + Class channelClass) throws IOException { // Configuration already does caching for the Class lookup. Class logWriterClass = conf.getClass( "hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class); try { - AsyncWriter writer = logWriterClass.getConstructor(EventLoop.class, Class.class) - .newInstance(eventLoop, channelClass); + AsyncWriter writer = + logWriterClass.getConstructor(EventLoop.class, EventLoopGroup.class, Class.class) + .newInstance(eventLoop, eventLoopGroup4Fanout, channelClass); writer.init(fs, path, conf, overwritable); return writer; } catch (Exception e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java index 34ef3f0904..d2f08c8b05 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java @@ -51,7 +51,7 @@ public class NettyAsyncFSWALConfigHelper { Preconditions.checkNotNull(channelClass, "channel class is null"); conf.set(EVENT_LOOP_CONFIG, CONFIG_NAME); EVENT_LOOP_CONFIG_MAP.put(CONFIG_NAME, - Pair.> newPair(group, channelClass)); + Pair.>newPair(group, channelClass)); } static Pair> getEventLoopConfig(Configuration conf) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 4377196cce..a604316cd9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -66,6 +66,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { private static DistributedFileSystem FS; private static EventLoopGroup EVENT_LOOP_GROUP; + private static EventLoopGroup EVENT_LOOP_GROUP_4FanOUt; private static Class CHANNEL_CLASS; @@ -80,6 +81,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { TEST_UTIL.startMiniDFSCluster(3); FS = TEST_UTIL.getDFSCluster().getFileSystem(); EVENT_LOOP_GROUP = new NioEventLoopGroup(); + EVENT_LOOP_GROUP_4FanOUt = new NioEventLoopGroup(3); CHANNEL_CLASS = NioSocketChannel.class; } @@ -99,7 +101,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { try { FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"), true, true, (short) 3, FS.getDefaultBlockSize(), - EVENT_LOOP_GROUP.next(), CHANNEL_CLASS); + EVENT_LOOP_GROUP.next(), EVENT_LOOP_GROUP_4FanOUt, CHANNEL_CLASS); out.close(); break; } catch (IOException e) { @@ -129,7 +131,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, EVENT_LOOP_GROUP_4FanOUt, CHANNEL_CLASS); writeAndVerify(eventLoop, FS, f, out); } @@ -138,7 +140,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, EVENT_LOOP_GROUP_4FanOUt,CHANNEL_CLASS); out.guess(5 * 1024); assertEquals(8 * 1024, out.guess(5 * 1024)); assertEquals(16 * 1024, out.guess(10 * 1024)); @@ -153,7 +155,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, EVENT_LOOP_GROUP_4FanOUt, CHANNEL_CLASS); byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); out.write(b, 0, b.length); @@ -186,7 +188,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, EVENT_LOOP_GROUP_4FanOUt, CHANNEL_CLASS); Thread.sleep(READ_TIMEOUT_MS * 2); // the connection to datanode should still alive. writeAndVerify(eventLoop, FS, f, out); @@ -201,7 +203,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); try { FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + FS.getDefaultBlockSize(), eventLoop, EVENT_LOOP_GROUP_4FanOUt, CHANNEL_CLASS); fail("should fail with parent does not exist"); } catch (RemoteException e) { LOG.info("expected exception caught", e); @@ -226,7 +228,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); try { FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + FS.getDefaultBlockSize(), eventLoop, EVENT_LOOP_GROUP_4FanOUt, CHANNEL_CLASS); fail("should fail with connection error"); } catch (IOException e) { LOG.info("expected exception caught", e); @@ -246,7 +248,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS); + false, (short) 3, 1024 * 1024 * 1024, eventLoop, EVENT_LOOP_GROUP_4FanOUt, CHANNEL_CLASS); byte[] b = new byte[50 * 1024 * 1024]; ThreadLocalRandom.current().nextBytes(b); out.write(b); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java index b0d689c883..d61cbbdd9d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java @@ -44,6 +44,7 @@ import org.junit.experimental.categories.Category; public class TestLocalAsyncOutput { private static EventLoopGroup GROUP = new NioEventLoopGroup(); + private static EventLoopGroup GROUP_4Fanout = new NioEventLoopGroup(3); private static Class CHANNEL_CLASS = NioSocketChannel.class; @@ -53,6 +54,7 @@ public class TestLocalAsyncOutput { public static void tearDownAfterClass() throws IOException { TEST_UTIL.cleanupTestDir(); GROUP.shutdownGracefully(); + GROUP_4Fanout.shutdownGracefully(); } @Test @@ -61,7 +63,7 @@ public class TestLocalAsyncOutput { Path f = new Path(TEST_UTIL.getDataTestDir(), "test"); FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration()); AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true, - fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next(), CHANNEL_CLASS); + fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next(), GROUP_4Fanout, CHANNEL_CLASS); byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); out.write(b); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index f1ecd3a953..6ab49de000 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -84,6 +84,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { private static DistributedFileSystem FS; private static EventLoopGroup EVENT_LOOP_GROUP; + private static EventLoopGroup EVENT_LOOP_GROUP_4Fanout; private static Class CHANNEL_CLASS; @@ -170,6 +171,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { @BeforeClass public static void setUpBeforeClass() throws Exception { EVENT_LOOP_GROUP = new NioEventLoopGroup(); + EVENT_LOOP_GROUP_4Fanout = new NioEventLoopGroup(3); CHANNEL_CLASS = NioSocketChannel.class; TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE); @@ -248,7 +250,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { Path f = getTestFile(); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 1, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + false, (short) 1, FS.getDefaultBlockSize(), eventLoop, EVENT_LOOP_GROUP_4Fanout, CHANNEL_CLASS); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(eventLoop, FS, f, out); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java index 2ae916f679..1f64becf70 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java @@ -42,12 +42,14 @@ import org.junit.experimental.categories.Category; public class TestAsyncFSWAL extends AbstractTestFSWAL { private static EventLoopGroup GROUP; + private static EventLoopGroup GROUP4Fanout; private static Class CHANNEL_CLASS; @BeforeClass public static void setUpBeforeClass() throws Exception { GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncFSWAL")); + GROUP4Fanout = new NioEventLoopGroup(3, Threads.newDaemonThreadFactory("TestForFanout")); CHANNEL_CLASS = NioSocketChannel.class; AbstractTestFSWAL.setUpBeforeClass(); } @@ -63,7 +65,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL { Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix) throws IOException { return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, - suffix, GROUP.next(), CHANNEL_CLASS); + suffix, GROUP.next(), GROUP4Fanout, CHANNEL_CLASS); } @Override @@ -72,7 +74,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL { boolean failIfWALExists, String prefix, String suffix, final Runnable action) throws IOException { return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, - suffix, GROUP.next(), CHANNEL_CLASS) { + suffix, GROUP.next(), GROUP4Fanout, CHANNEL_CLASS) { @Override void atHeadOfRingBufferEventHandlerAppend() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java index 708d64c346..b5df3bcd25 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java @@ -43,12 +43,14 @@ import org.junit.experimental.categories.Category; public class TestAsyncProtobufLog extends AbstractTestProtobufLog { private static EventLoopGroup EVENT_LOOP_GROUP; + private static EventLoopGroup EVENT_LOOP_GROUP_4Fanout; private static Class CHANNEL_CLASS; @BeforeClass public static void setUpBeforeClass() throws Exception { EVENT_LOOP_GROUP = new NioEventLoopGroup(); + EVENT_LOOP_GROUP_4Fanout = new NioEventLoopGroup(3); CHANNEL_CLASS = NioSocketChannel.class; AbstractTestProtobufLog.setUpBeforeClass(); } @@ -62,7 +64,7 @@ public class TestAsyncProtobufLog extends AbstractTestProtobufLog CHANNEL_CLASS; @BeforeClass public static void setUpBeforeClass() throws Exception { GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncWALReplay")); + GROUP_4Fanout = new NioEventLoopGroup(3, Threads.newDaemonThreadFactory("TestAsyncWALReplayFanout")); CHANNEL_CLASS = NioSocketChannel.class; Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); conf.set(WALFactory.WAL_PROVIDER, "asyncfs"); @@ -62,6 +64,7 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay { @Override protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException { return new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName, - HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP.next(), CHANNEL_CLASS); + HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP.next(), GROUP_4Fanout, + CHANNEL_CLASS); } } -- 2.13.6 (Apple Git-96)