diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 4c77842..baeafad 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -18,6 +18,7 @@ package org.apache.hive.hcatalog.streaming; +import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.cli.CliSessionState; @@ -342,6 +343,11 @@ public Void run() throws Exception { return null; } } ); + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception); + } } catch (IOException e) { LOG.error("Error closing connection to " + endPt, e); } catch (InterruptedException e) { @@ -937,6 +943,11 @@ public Void run() throws StreamingException { } } ); + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception); + } } catch (IOException e) { throw new ImpersonationFailed("Failed closing Txn Batch as user '" + username + "' on endPoint :" + endPt, e); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index fbf5481..37750c9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -224,6 +224,11 @@ public Object run() throws Exception { return null; } }); + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception); + } } txnHandler.markCleaned(ci); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index 8495c66..4d6e24e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -174,6 +174,11 @@ public Object run() throws Exception { return null; } }); + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception); + } if (wrapper.size() == 1) { LOG.debug("Running job as " + wrapper.get(0)); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 3705a34..c01fdcd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -213,12 +213,18 @@ private CompactionType checkForCompaction(final CompactionInfo ci, LOG.info("Going to initiate as user " + runAs); UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, UserGroupInformation.getLoginUser()); - return ugi.doAs(new PrivilegedExceptionAction() { + CompactionType compactionType = ugi.doAs(new PrivilegedExceptionAction() { @Override public CompactionType run() throws Exception { return determineCompactionType(ci, txns, sd); } }); + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception); + } + return compactionType; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index adffa8c..7a3294a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.ValidTxnList; @@ -34,8 +35,6 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetAddress; @@ -173,6 +172,11 @@ public Object run() throws Exception { return null; } }); + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception); + } } txnHandler.markCompacted(ci); } catch (Exception e) { diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index f4debfe..258a734 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.txn.compactor.Cleaner; import org.apache.hadoop.hive.ql.txn.compactor.Initiator; import org.apache.hadoop.hive.ql.txn.compactor.Worker; +import org.apache.hive.common.util.ReflectionUtil; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -51,13 +52,14 @@ import org.junit.rules.TestName; import java.io.File; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static junit.framework.Assert.assertTrue; + /** * TODO: this should be merged with TestTxnCommands once that is checked in * specifically the tests; the supporting code here is just a clone of TestTxnCommands @@ -561,6 +563,50 @@ public void testInitiatorWithMultipleFailedCompactions() throws Exception { Assert.assertEquals("Unexpected num succeeded", 1, cbs.succeeded); Assert.assertEquals("Unexpected num total5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total); } + + /** + * Make sure there's no FileSystem$Cache$Key leak due to UGI use + * @throws Exception + */ + @Test + public void testFileSystemUnCaching() throws Exception { + int cacheSizeBefore; + int cacheSizeAfter; + + // get the size of cache BEFORE + cacheSizeBefore = getFileSystemCacheSize(); + + // Insert a row to ACID table + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); + + // Perform a major compaction + runStatementOnDriver("alter table " + Table.ACIDTBL + " compact 'major'"); + runWorker(hiveConf); + runCleaner(hiveConf); + + // get the size of cache AFTER + cacheSizeAfter = getFileSystemCacheSize(); + + Assert.assertEquals(cacheSizeBefore, cacheSizeAfter); + } + + private int getFileSystemCacheSize() throws Exception { + try { + Field cache = FileSystem.class.getDeclaredField("CACHE"); + cache.setAccessible(true); + Object o = cache.get(null); // FileSystem.CACHE + + Field mapField = o.getClass().getDeclaredField("map"); + mapField.setAccessible(true); + Map map = (HashMap)mapField.get(o); // FileSystem.CACHE.map + + return map.size(); + } catch (NoSuchFieldException e) { + System.out.println(e); + } + return 0; + } + private static class CompactionsByState { private int attempted; private int failed;