From 67781ae3aa90b86b9e1ef9437a135737a7b5da4b Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 21 Mar 2016 18:40:38 +0800 Subject: [PATCH] HBASE-15495 Connection leak in FanOutOneBlockAsyncDFSOutputHelper --- .../util/FanOutOneBlockAsyncDFSOutputHelper.java | 23 +++++++-- .../util/TestFanOutOneBlockAsyncDFSOutput.java | 59 ++++++++++++++-------- 2 files changed, 57 insertions(+), 25 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java index d34bbb0..ea71701 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java @@ -99,6 +99,7 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; /** @@ -594,13 +595,15 @@ public class FanOutOneBlockAsyncDFSOutputHelper { beginFileLease(client, src, stat.getFileId()); boolean succ = false; LocatedBlock locatedBlock = null; - List datanodeList = new ArrayList<>(); + List> futureList = null; try { DataChecksum summer = createChecksum(client); locatedBlock = namenode.addBlock(src, client.getClientName(), null, null, stat.getFileId(), null); - for (Future future : connectToDataNodes(conf, clientName, locatedBlock, 0L, 0L, - PIPELINE_SETUP_CREATE, summer, eventLoop)) { + List datanodeList = new ArrayList<>(); + futureList = connectToDataNodes(conf, clientName, locatedBlock, 0L, 0L, PIPELINE_SETUP_CREATE, + summer, eventLoop); + for (Future future : futureList) { // fail the creation if there are connection failures since we are fail-fast. The upper // layer should retry itself if needed. datanodeList.add(future.syncUninterruptibly().getNow()); @@ -610,8 +613,18 @@ public class FanOutOneBlockAsyncDFSOutputHelper { stat.getFileId(), locatedBlock, eventLoop, datanodeList, summer, ALLOC); } finally { if (!succ) { - for (Channel c : datanodeList) { - c.close(); + if (futureList != null) { + for (Future f : futureList) { + f.addListener(new FutureListener() { + + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + future.getNow().close(); + } + } + }); + } } endFileLease(client, src, stat.getFileId()); fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java index 0e9f42e..ec15b87 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java @@ -22,12 +22,11 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; @@ -37,9 +36,8 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.ipc.RemoteException; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -47,6 +45,10 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + @Category({ MiscTests.class, MediumTests.class }) public class TestFanOutOneBlockAsyncDFSOutput { @@ -63,8 +65,6 @@ public class TestFanOutOneBlockAsyncDFSOutput { @BeforeClass public static void setUp() throws Exception { - Logger.getLogger("org.apache.hadoop.hdfs.StateChange").setLevel(Level.DEBUG); - Logger.getLogger("BlockStateChange").setLevel(Level.DEBUG); TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); TEST_UTIL.startMiniDFSCluster(3); FS = TEST_UTIL.getDFSCluster().getFileSystem(); @@ -83,8 +83,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { throws IOException, InterruptedException, ExecutionException { final byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); - final FanOutOneBlockAsyncDFSOutputFlushHandler handler = - new FanOutOneBlockAsyncDFSOutputFlushHandler(); + final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler(); eventLoop.execute(new Runnable() { @Override @@ -107,9 +106,8 @@ public class TestFanOutOneBlockAsyncDFSOutput { public void test() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - final FanOutOneBlockAsyncDFSOutput out = - FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop); + final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); writeAndVerify(eventLoop, f, out); } @@ -117,13 +115,11 @@ public class TestFanOutOneBlockAsyncDFSOutput { public void testRecover() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - final FanOutOneBlockAsyncDFSOutput out = - FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop); + final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); final byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); - final FanOutOneBlockAsyncDFSOutputFlushHandler handler = - new FanOutOneBlockAsyncDFSOutputFlushHandler(); + final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler(); eventLoop.execute(new Runnable() { @Override @@ -164,9 +160,8 @@ public class TestFanOutOneBlockAsyncDFSOutput { public void testHeartbeat() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - final FanOutOneBlockAsyncDFSOutput out = - FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop); + final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); Thread.sleep(READ_TIMEOUT_MS * 2); // the connection to datanode should still alive. writeAndVerify(eventLoop, f, out); @@ -187,4 +182,28 @@ public class TestFanOutOneBlockAsyncDFSOutput { assertTrue(e.unwrapRemoteException() instanceof FileNotFoundException); } } + + @Test + public void testConnectToDatanodeFailed() + throws IOException, ClassNotFoundException, NoSuchMethodException, SecurityException, + IllegalAccessException, IllegalArgumentException, InvocationTargetException { + Class xceiverServerClass = Class + .forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); + Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers"); + numPeersMethod.setAccessible(true); + // make one datanode broken + TEST_UTIL.getDFSCluster().getDataNodes().get(0).shutdownDatanode(true); + Path f = new Path("/test"); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + try { + FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, + FS.getDefaultBlockSize(), eventLoop); + fail("should fail with connection error"); + } catch (IOException e) { + e.printStackTrace(); + } + for (DataNode dn : TEST_UTIL.getDFSCluster().getDataNodes()) { + assertEquals(0, numPeersMethod.invoke(dn.getXferServer())); + } + } } -- 1.9.1