From 7b785ad4038fb9dbf7f56d8493c9281542aeb25b Mon Sep 17 00:00:00 2001 From: Ashish Singhi Date: Tue, 20 Mar 2018 18:10:15 +0530 Subject: [PATCH] HBASE-15291 FileSystem not closed in secure bulkLoad --- .../hbase/regionserver/SecureBulkLoadManager.java | 40 +++++++++++++++++----- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index 264d9858de..b57a27f5e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -145,15 +145,26 @@ public class SecureBulkLoadManager { public void cleanupBulkLoad(final HRegion region, final CleanupBulkLoadRequest request) throws IOException { - region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser()); + try { + region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser()); - Path path = new Path(request.getBulkToken()); - if (!fs.delete(path, true)) { - if (fs.exists(path)) { - throw new IOException("Failed to clean up " + path); + Path path = new Path(request.getBulkToken()); + if (!fs.delete(path, true)) { + if (fs.exists(path)) { + throw new IOException("Failed to clean up " + path); + } + } + LOG.info("Cleaned up " + path + " successfully."); + } finally { + UserGroupInformation ugi = getActiveUser().getUGI(); + try { + if (!UserGroupInformation.getLoginUser().equals(ugi)) { + FileSystem.closeAllForUGI(ugi); + } + } catch (IOException e) { + LOG.error("Failed to close FileSystem for: " + ugi, e); } } - LOG.info("Cleaned up " + path + " successfully."); } public Map> secureBulkLoadHFiles(final HRegion region, @@ -304,7 +315,7 @@ public class SecureBulkLoadManager { } if (srcFs == null) { - srcFs = FileSystem.get(p.toUri(), conf); + srcFs = FileSystem.newInstance(p.toUri(), conf); } if(!isFile(p)) { @@ -334,11 +345,20 @@ public class SecureBulkLoadManager { @Override public void doneBulkLoad(byte[] family, String srcPath) throws IOException { LOG.debug("Bulk Load done for: " + srcPath); + closeSrcFs(); + } + + private void closeSrcFs() throws IOException { + if (srcFs != null) { + srcFs.close(); + srcFs = null; + } } @Override public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException { if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) { + closeSrcFs(); // files are copied so no need to move them back return; } @@ -350,12 +370,15 @@ public class SecureBulkLoadManager { // prepare stage, so no need of rename here again if (p.equals(stageP)) { LOG.debug(p.getName() + " is already available in source directory. Skipping rename."); + closeSrcFs(); return; } LOG.debug("Moving " + stageP + " back to " + p); - if(!fs.rename(stageP, p)) + if (!fs.rename(stageP, p)) { + closeSrcFs(); throw new IOException("Failed to move HFile: " + stageP + " to " + p); + } // restore original permission if (origPermissions.containsKey(srcPath)) { @@ -363,6 +386,7 @@ public class SecureBulkLoadManager { } else { LOG.warn("Can't find previous permission for path=" + srcPath); } + closeSrcFs(); } /** -- 2.15.1.windows.2