From bc16e3ac700b825de8f1c2bea12c813e0a67ce45 Mon Sep 17 00:00:00 2001 From: Zach York Date: Mon, 4 Dec 2017 12:11:21 -0800 Subject: [PATCH] HBASE-19435 Reopen Files for ClosedChannelException in BucketCache --- .../hadoop/hbase/io/hfile/bucket/FileIOEngine.java | 28 +++++++++++++++++++++- .../hbase/io/hfile/bucket/TestFileIOEngine.java | 15 ++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index 3419587..cb454d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -19,12 +19,15 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.Arrays; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -108,6 +111,17 @@ public class FileIOEngine implements IOEngine { return 0; } + @VisibleForTesting + void closeFileChannels() { + for (FileChannel fileChannel: fileChannels) { + try { + fileChannel.close(); + } catch (IOException e) { + LOG.warn("Failed to close FileChannel", e); + } + } + } + /** * Transfers data from the given byte buffer to file * @param srcBuffer the given byte buffer from which bytes are to be read @@ -169,11 +183,18 @@ public class FileIOEngine implements IOEngine { int bufLimit = buffer.limit(); while (true) { FileChannel fileChannel = fileChannels[accessFileNum]; + int accessLen = 0; if (endFileNum > accessFileNum) { // short the limit; buffer.limit((int) (buffer.limit() - remainingAccessDataLen + sizePerFile - accessOffset)); } - int accessLen = accessor.access(fileChannel, buffer, accessOffset); + try { + accessLen = accessor.access(fileChannel, buffer, accessOffset); + } catch (ClosedChannelException e) { + LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file. ", e); + refreshFileConnection(accessFileNum); + continue; + } // recover the limit buffer.limit(bufLimit); if (accessLen < remainingAccessDataLen) { @@ -213,6 +234,11 @@ public class FileIOEngine implements IOEngine { return fileNum; } + private void refreshFileConnection(int accessFileNum) throws FileNotFoundException { + rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw"); + fileChannels[accessFileNum] = rafs[accessFileNum].getChannel(); + } + private static interface FileAccessor { int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset) throws IOException; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java index a03818b..adf7fd0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java @@ -114,4 +114,19 @@ public class TestFileIOEngine { fileIOEngine.read(ByteBuffer.wrap(data2), 0); assertArrayEquals(data1, data2); } + + @Test + public void testClosedChannelException() throws IOException { + fileIOEngine.closeFileChannels(); + int len = 5; + long offset = 0L; + byte[] data1 = new byte[len]; + for (int j = 0; j < data1.length; ++j) { + data1[j] = (byte) (Math.random() * 255); + } + byte[] data2 = new byte[len]; + fileIOEngine.write(ByteBuffer.wrap(data1), offset); + fileIOEngine.read(ByteBuffer.wrap(data2), offset); + assertArrayEquals(data1, data2); + } } -- 2.6.4