From d315dad364622c02086d96518743c7e610c21d78 Mon Sep 17 00:00:00 2001 From: Ashish Singhi Date: Thu, 28 Jan 2016 14:23:26 +0530 Subject: [PATCH] HBASE-9393 Hbase does not closing a closed socket resulting in many CLOSE_WAIT --- .../org/apache/hadoop/hbase/io/hfile/HFile.java | 34 ++++++++++++++++++---- .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 20 ++++++++++++- .../hadoop/hbase/io/hfile/HFileReaderImpl.java | 9 ++++++ 3 files changed, 56 insertions(+), 7 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 1e1835f..03d681e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -41,6 +41,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -474,6 +475,11 @@ public class HFile { @VisibleForTesting boolean prefetchComplete(); + + /** + * To close only the stream's socket. HBASE-9393 + */ + void unbufferStream(); } /** @@ -490,8 +496,8 @@ public class HFile { */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH", justification="Intentional") - private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis, - long size, CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException { + private static Reader openReader(Path path, FSDataInputStreamWrapper fsdis, long size, + CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException { FixedFileTrailer trailer = null; try { boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum(); @@ -513,6 +519,22 @@ public class HFile { LOG.warn("Error closing fsdis FSDataInputStreamWrapper", t2); } throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, t); + } finally { + unbufferStream(fsdis); + } + } + + static void unbufferStream(FSDataInputStreamWrapper fsdis) { + boolean useHBaseChecksum = fsdis.shouldUseHBaseChecksum(); + final FSDataInputStream stream = fsdis.getStream(useHBaseChecksum); + if (stream != null && stream.getWrappedStream() instanceof CanUnbuffer) { + // Enclosing unbuffer() in try-catch just to be on defensive side. + try { + stream.unbuffer(); + } catch (Throwable e) { + LOG.error("Failed to unbuffer the stream so possibly there may be a TCP socket connection " + + "left open in CLOSE_WAIT state.", e); + } } } @@ -541,7 +563,7 @@ public class HFile { } else { hfs = (HFileSystem)fs; } - return pickReaderVersion(path, fsdis, size, cacheConf, hfs, conf); + return openReader(path, fsdis, size, cacheConf, hfs, conf); } /** @@ -556,8 +578,8 @@ public class HFile { FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException { Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf"); FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path); - return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(), - cacheConf, stream.getHfs(), conf); + return openReader(path, stream, fs.getFileStatus(path).getLen(), cacheConf, stream.getHfs(), + conf); } /** @@ -567,7 +589,7 @@ public class HFile { FSDataInputStream fsdis, long size, CacheConfig cacheConf, Configuration conf) throws IOException { FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis); - return pickReaderVersion(path, wrapper, size, cacheConf, null, conf); + return openReader(path, wrapper, size, cacheConf, null, conf); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 0a25825..b4b5f81 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -33,10 +33,10 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.io.ByteBuffInputStream; import org.apache.hadoop.hbase.io.ByteBufferSupportDataOutputStream; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; @@ -1311,6 +1311,11 @@ public class HFileBlock implements Cacheable { void setIncludesMemstoreTS(boolean includesMemstoreTS); void setDataBlockEncoder(HFileDataBlockEncoder encoder); + + /** + * To close only the stream's socket. HBASE-9393 + */ + void unbufferStream(); } /** @@ -1756,6 +1761,19 @@ public class HFileBlock implements Cacheable { public String toString() { return "hfs=" + hfs + ", path=" + path + ", fileContext=" + fileContext; } + + @Override + public void unbufferStream() { + // To handle concurrent reads, ensure that no other client is accessing the streams while we + // unbuffer it. + if (streamLock.tryLock()) { + try { + HFile.unbufferStream(this.streamWrapper); + } finally { + streamLock.unlock(); + } + } + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 4db26d1..8fe5ed1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -570,6 +570,10 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { @Override public void close() { + if (!pread) { + // For seek + pread stream socket should be closed when the scanner is closed. HBASE-9393 + reader.unbufferStream(); + } this.returnBlocks(true); } @@ -1891,4 +1895,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { public int getMajorVersion() { return 3; } + + @Override + public void unbufferStream() { + fsBlockReader.unbufferStream(); + } } -- 1.9.2.msysgit.0