From 0537e3199ec3366cec883fa44043e8d2b68ee264 Mon Sep 17 00:00:00 2001 From: Ashish Singhi Date: Wed, 17 Feb 2016 12:50:41 +0530 Subject: [PATCH] HBASE-9393 Hbase does not closing a closed socket resulting in many CLOSE_WAIT --- .../hadoop/hbase/io/FSDataInputStreamWrapper.java | 60 ++++++++++++++++++++++ .../org/apache/hadoop/hbase/io/hfile/HFile.java | 17 ++++-- .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 18 +++++++ .../hadoop/hbase/io/hfile/HFileReaderImpl.java | 9 ++++ 4 files changed, 99 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java index b06be6b..71c2c42 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java @@ -18,7 +18,11 @@ package org.apache.hadoop.hbase.io; import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Method; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -32,6 +36,8 @@ import com.google.common.annotations.VisibleForTesting; * see method comments. */ public class FSDataInputStreamWrapper { + private static final Log LOG = LogFactory.getLog(FSDataInputStreamWrapper.class); + private final HFileSystem hfs; private final Path path; private final FileLink link; @@ -74,6 +80,11 @@ public class FSDataInputStreamWrapper { // reads without hbase checksum verification. private volatile int hbaseChecksumOffCount = -1; + private static Boolean instanceOfCanUnbuffer = null; + // Using reflection to get org.apache.hadoop.fs.CanUnbuffer#unbuffer method to avoid compilation + // errors against Hadoop pre 2.6.4 and 2.7.1 versions. + private static Method unbuffer = null; + public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { this(fs, null, path, false); } @@ -219,4 +230,53 @@ public class FSDataInputStreamWrapper { public HFileSystem getHfs() { return this.hfs; } + + /** + * This will free sockets and file descriptors held by the stream only when the stream implements + * org.apache.hadoop.fs.CanUnbuffer. + */ + @SuppressWarnings("rawtypes") + public void unbuffer() { + FSDataInputStream stream = this.useHBaseChecksum ? this.streamNoFsChecksum : this.stream; + if (stream != null) { + InputStream wrappedStream = stream.getWrappedStream(); + // CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop + // 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the + // CanUnbuffer interface or not and based on that call the unbuffer api. + final Class streamClass = wrappedStream.getClass(); + if (instanceOfCanUnbuffer == null) { + // To ensure we compute whether the stream is instance of CanUnbuffer only once. + instanceOfCanUnbuffer = false; + Class[] streamInterfaces = streamClass.getInterfaces(); + for (Class c : streamInterfaces) { + if (c.getCanonicalName().toString().equals("org.apache.hadoop.fs.CanUnbuffer")) { + try { + unbuffer = streamClass.getDeclaredMethod("unbuffer"); + } catch (Exception e) { + LOG.error("Failed to find 'unbuffer' method in class " + streamClass, e); + return; + } + instanceOfCanUnbuffer = true; + break; + } + } + } + if (instanceOfCanUnbuffer) { + try { + if (unbuffer == null) { + try { + unbuffer = streamClass.getDeclaredMethod("unbuffer"); + } catch (Exception e) { + LOG.error("Failed to find 'unbuffer' method in class " + streamClass, e); + return; + } + } + unbuffer.invoke(wrappedStream); + } catch (Exception e) { + LOG.error("Failed to unbuffer the stream so possibly there may be a TCP socket " + + "connection left open in CLOSE_WAIT state for this RegionServer.", e); + } + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 1e1835f..bafc117 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -474,6 +474,11 @@ public class HFile { @VisibleForTesting boolean prefetchComplete(); + + /** + * To close the stream's socket. HBASE-9393 + */ + void unbufferStream(); } /** @@ -490,8 +495,8 @@ public class HFile { */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH", justification="Intentional") - private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis, - long size, CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException { + private static Reader openReader(Path path, FSDataInputStreamWrapper fsdis, long size, + CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException { FixedFileTrailer trailer = null; try { boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum(); @@ -513,6 +518,8 @@ public class HFile { LOG.warn("Error closing fsdis FSDataInputStreamWrapper", t2); } throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, t); + } finally { + fsdis.unbuffer(); } } @@ -541,7 +548,7 @@ public class HFile { } else { hfs = (HFileSystem)fs; } - return pickReaderVersion(path, fsdis, size, cacheConf, hfs, conf); + return openReader(path, fsdis, size, cacheConf, hfs, conf); } /** @@ -556,7 +563,7 @@ public class HFile { FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException { Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf"); FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path); - return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(), + return openReader(path, stream, fs.getFileStatus(path).getLen(), cacheConf, stream.getHfs(), conf); } @@ -567,7 +574,7 @@ public class HFile { FSDataInputStream fsdis, long size, CacheConfig cacheConf, Configuration conf) throws IOException { FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis); - return pickReaderVersion(path, wrapper, size, cacheConf, null, conf); + return openReader(path, wrapper, size, cacheConf, null, conf); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index e7a1e5e..5801520 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -1311,6 +1311,11 @@ public class HFileBlock implements Cacheable { void setIncludesMemstoreTS(boolean includesMemstoreTS); void setDataBlockEncoder(HFileDataBlockEncoder encoder); + + /** + * To close the stream's socket. HBASE-9393 + */ + void unbufferStream(); } /** @@ -1758,6 +1763,19 @@ public class HFileBlock implements Cacheable { public String toString() { return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext; } + + @Override + public void unbufferStream() { + // To handle concurrent reads, ensure that no other client is accessing the streams while we + // unbuffer it. + if (streamLock.tryLock()) { + try { + this.streamWrapper.unbuffer(); + } finally { + streamLock.unlock(); + } + } + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index b2f5ded..cc31979 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -575,6 +575,10 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { @Override public void close() { + if (!pread) { + // For seek + pread stream socket should be closed when the scanner is closed. HBASE-9393 + reader.unbufferStream(); + } this.returnBlocks(true); } @@ -1898,4 +1902,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { public int getMajorVersion() { return 3; } + + @Override + public void unbufferStream() { + fsBlockReader.unbufferStream(); + } } -- 1.9.2.msysgit.0