diff --git common/src/java/org/apache/hadoop/hive/common/JavaUtils.java common/src/java/org/apache/hadoop/hive/common/JavaUtils.java index 0dba331..7d3980f 100644 --- common/src/java/org/apache/hadoop/hive/common/JavaUtils.java +++ common/src/java/org/apache/hadoop/hive/common/JavaUtils.java @@ -18,12 +18,40 @@ package org.apache.hadoop.hive.common; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.URLClassLoader; +import java.util.Arrays; + /** * Collection of Java class loading/reflection related utilities common across * Hive. */ public final class JavaUtils { + private static final Log LOG = LogFactory.getLog(JavaUtils.class); + private static final Method SUN_MISC_UTIL_RELEASE; + + static { + if (Closeable.class.isAssignableFrom(URLClassLoader.class)) { + SUN_MISC_UTIL_RELEASE = null; + } else { + Method release = null; + try { + Class clazz = Class.forName("sun.misc.ClassLoaderUtil"); + release = clazz.getMethod("releaseLoader", URLClassLoader.class); + } catch (Exception e) { + // ignore + } + SUN_MISC_UTIL_RELEASE = release; + } + } + /** * Standard way of getting classloader in Hive code (outside of Hadoop). * @@ -40,6 +68,49 @@ public static ClassLoader getClassLoader() { return classLoader; } + public static void closeClassLoadersTo(ClassLoader current, ClassLoader stop) { + if (!isValidHierarchy(current, stop)) { + return; + } + for (; current != null && current != stop; current = current.getParent()) { + try { + closeClassLoader(current); + } catch (IOException e) { + LOG.info("Failed to close class loader " + current + + Arrays.toString(((URLClassLoader) current).getURLs()), e); + } + } + } + + // check before closing loaders, not to close app-classloader, etc. by mistake + private static boolean isValidHierarchy(ClassLoader current, ClassLoader stop) { + if (current == null || stop == null || current == stop) { + return false; + } + for (; current != null && current != stop; current = current.getParent()) { + } + return current == stop; + } + + // best effort to close + // see https://issues.apache.org/jira/browse/HIVE-3969 for detail + public static void closeClassLoader(ClassLoader loader) throws IOException { + if (loader instanceof Closeable) { + ((Closeable)loader).close(); + } else if (SUN_MISC_UTIL_RELEASE != null && loader instanceof URLClassLoader) { + try { + SUN_MISC_UTIL_RELEASE.invoke(null, loader); + } catch (InvocationTargetException e) { + if (e.getTargetException() instanceof IOException) { + throw (IOException)e.getTargetException(); + } + throw new IOException(e.getTargetException()); + } catch (Exception e) { + throw new IOException(e); + } + } + } + private JavaUtils() { // prevent instantiation } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java index a249d74..af25dc8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java @@ -96,7 +96,14 @@ protected void closeOp(boolean abort) throws HiveException { } } inputPart.close(); - } + inputPart = null; + + for (PTFInputDef iDef = conf.getFuncDef(); iDef != null; iDef = iDef.getInput()) { + if (iDef instanceof PartitionedTableFunctionDef) { + ((PartitionedTableFunctionDef)iDef).getTFunction().close(); + } + } + } @Override public void processOp(Object row, int tag) throws HiveException diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java index 8d6ebaa..cef24e3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java @@ -123,6 +123,10 @@ public void mergeDependency(Operator op, old_dep.setExpr(null); } } + + public void clear() { + depMap.clear(); + } } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 5546d03..bb040b3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.MapRedStats; @@ -74,6 +75,8 @@ public class SessionState { private static final Log LOG = LogFactory.getLog(SessionState.class); + protected ClassLoader parentLoader; + /** * current configuration. */ @@ -237,6 +240,7 @@ public SessionState(HiveConf conf, String userName) { if (StringUtils.isEmpty(conf.getVar(HiveConf.ConfVars.HIVESESSIONID))) { conf.setVar(HiveConf.ConfVars.HIVESESSIONID, makeSessionId()); } + parentLoader = JavaUtils.getClassLoader(); } private static final SimpleDateFormat DATE_FORMAT = @@ -286,8 +290,13 @@ public static SessionState start(HiveConf conf) { /** * Sets the given session state in the thread local var for sessions. */ - public static void setCurrentSessionState(SessionState session) { - tss.set(session); + public static void setCurrentSessionState(SessionState startSs) { + tss.set(startSs); + Thread.currentThread().setContextClassLoader(startSs.getConf().getClassLoader()); + } + + public static void detachSession() { + tss.remove(); } /** @@ -930,6 +939,7 @@ public void setCurrentDatabase(String currentDatabase) { } public void close() throws IOException { + JavaUtils.closeClassLoadersTo(conf.getClassLoader(), parentLoader); File resourceDir = new File(getConf().getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR)); LOG.debug("Removing resource dir " + resourceDir); @@ -939,6 +949,8 @@ public void close() throws IOException { } } catch (IOException e) { LOG.info("Error removing session resource dir " + resourceDir, e); + } finally { + detachSession(); } try { diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java index 32e78ac..080fd44 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/TableFunctionEvaluator.java @@ -137,5 +137,12 @@ public PTFPartition transformRawInput(PTFPartition iPart) throws HiveException { protected PTFPartition _transformRawInput(PTFPartition iPart) throws HiveException { return null; } + + public void close() { + if (outputPartition != null) { + outputPartition.close(); + } + outputPartition = null; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/session/TestSessionState.java ql/src/test/org/apache/hadoop/hive/ql/session/TestSessionState.java index c51ff09..7df7ecc 100644 --- ql/src/test/org/apache/hadoop/hive/ql/session/TestSessionState.java +++ ql/src/test/org/apache/hadoop/hive/ql/session/TestSessionState.java @@ -88,4 +88,45 @@ public void testClose() throws Exception { ss.close(); assertNull(ss.getTezSession()); } + + class RegisterJarRunnable implements Runnable { + String jar; + ClassLoader loader; + SessionState ss; + + public RegisterJarRunnable(String jar, SessionState ss) { + this.jar = jar; + this.ss = ss; + } + + public void run() { + SessionState.start(ss); + SessionState.registerJar(jar); + loader = Thread.currentThread().getContextClassLoader(); + } + } + + @Test + public void testClassLoaderEquality() throws Exception { + HiveConf conf = new HiveConf(); + final SessionState ss1 = new SessionState(conf); + RegisterJarRunnable otherThread = new RegisterJarRunnable("./build/contrib/test/test-udfs.jar", ss1); + Thread th1 = new Thread(otherThread); + th1.start(); + th1.join(); + + // set state in current thread + SessionState.start(ss1); + SessionState ss2 = SessionState.get(); + ClassLoader loader2 = ss2.conf.getClassLoader(); + + System.out.println("Loader1:(Set in other thread) " + otherThread.loader); + System.out.println("Loader2:(Set in SessionState.conf) " + loader2); + System.out.println("Loader3:(CurrentThread.getContextClassLoader()) " + + Thread.currentThread().getContextClassLoader()); + assertEquals("Other thread loader and session state loader", + otherThread.loader, loader2); + assertEquals("Other thread loader and current thread loader", + otherThread.loader, Thread.currentThread().getContextClassLoader()); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFOPNumeric.java ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFOPNumeric.java index 2ada2ff..75f2b3a 100644 --- ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFOPNumeric.java +++ ql/src/test/org/apache/hadoop/hive/ql/udf/generic/TestGenericUDFOPNumeric.java @@ -8,7 +8,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hive.common.HiveCompat; import org.junit.Assert; public abstract class TestGenericUDFOPNumeric { diff --git service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index ace791a..1a7982b 100644 --- service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -197,6 +197,7 @@ private void cleanup(OperationState state) throws HiveSQLException { driver.close(); driver.destroy(); } + driver = null; SessionState ss = SessionState.get(); if (ss.getTmpOutputFile() != null) { diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index ef5b5c6..f730119 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hive.ql.exec.ListSinkOperator; import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.FetchOrientation; @@ -144,7 +143,7 @@ protected synchronized void acquire() throws HiveSQLException { protected synchronized void release() { assert sessionState != null; - // no need to release sessionState... + SessionState.detachSession(); } @Override @@ -411,10 +410,10 @@ public void close() throws HiveSQLException { hiveHist.closeStream(); } sessionState.close(); - release(); } catch (IOException ioe) { - release(); throw new HiveSQLException("Failure to close", ioe); + } finally { + release(); } }