diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index 815ccfa..38b1f2f 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -24,7 +24,9 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.lang.reflect.Constructor; import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; @@ -46,6 +48,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -54,6 +58,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.ObjectStore; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hive.common.util.ReflectionUtil; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.datanucleus.ClassLoaderResolver; import org.datanucleus.NucleusContext; @@ -916,4 +921,144 @@ private int getNucleusClassLoaderResolverMapSize() { } return -1; } + + /** + * Tests ADD JAR uses Hives ReflectionUtil.CONSTRUCTOR_CACHE + * + * @throws Exception + */ + @Test + public void testAddJarConstructorUnCaching() throws Exception { + // This test assumes the hive-contrib JAR has been built as part of the Hive build. + // Also dependent on the UDFExampleAdd class within that JAR. + setReflectionUtilCache(); + String udfClassName = "org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd"; + String mvnRepo = System.getProperty("maven.local.repository"); + String hiveVersion = System.getProperty("hive.version"); + String jarFileName = "hive-contrib-" + hiveVersion + ".jar"; + String[] pathParts = { + "org", "apache", "hive", + "hive-contrib", hiveVersion, jarFileName + }; + + // Create path to hive-contrib JAR on local filesystem + Path jarFilePath = new Path(mvnRepo); + for (String pathPart : pathParts) { + jarFilePath = new Path(jarFilePath, pathPart); + } + + Connection conn = getConnection(miniHS2.getJdbcURL(), "foo", "bar"); + String tableName = "testAddJar"; + Statement stmt = conn.createStatement(); + stmt.execute("SET hive.support.concurrency = false"); + // Create table + stmt.execute("DROP TABLE IF EXISTS " + tableName); + stmt.execute("CREATE TABLE " + tableName + " (key INT, value STRING)"); + // Load data + stmt.execute("LOAD DATA LOCAL INPATH '" + kvDataFilePath.toString() + "' INTO TABLE " + + tableName); + ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName); + // Ensure table is populated + assertTrue(res.next()); + + long cacheBeforeAddJar, cacheAfterAddJar, cacheAfterClose; + // Force the cache clear so we know its empty + invalidateReflectionUtlCache(); + cacheBeforeAddJar = getReflectionUtilCacheSize(); + System.out.println("CONSTRUCTOR_CACHE size before add jar: " + cacheBeforeAddJar); + System.out.println("CONSTRUCTOR_CACHE as map before add jar:" + getReflectionUtilCache().asMap()); + Assert.assertTrue("FAILED: CONSTRUCTOR_CACHE size before add jar: " + cacheBeforeAddJar, + cacheBeforeAddJar == 0); + + // Add the jar file + stmt.execute("ADD JAR " + jarFilePath.toString()); + // Create a temporary function using the jar + stmt.execute("CREATE TEMPORARY FUNCTION func AS '" + udfClassName + "'"); + // Execute the UDF + res = stmt.executeQuery("SELECT func(value) from " + tableName); + assertTrue(res.next()); + + // Check to make sure the cache is now being used + cacheAfterAddJar = getReflectionUtilCacheSize(); + System.out.println("CONSTRUCTOR_CACHE size after add jar: " + cacheAfterAddJar); + Assert.assertTrue("FAILED: CONSTRUCTOR_CACHE size after connection close: " + cacheAfterAddJar, + cacheAfterAddJar > 0); + conn.close(); + TimeUnit.SECONDS.sleep(10); + // Have to force a cleanup of all expired entries here because its possible that the + // expired entries will still be counted in Cache.size(). + // Taken from: + // http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/cache/CacheBuilder.html + cleanUpReflectionUtlCache(); + cacheAfterClose = getReflectionUtilCacheSize(); + System.out.println("CONSTRUCTOR_CACHE size after connection close: " + cacheAfterClose); + Assert.assertTrue("FAILED: CONSTRUCTOR_CACHE size after connection close: " + cacheAfterClose, + cacheAfterClose == 0); + } + + private void setReflectionUtilCache() { + Field constructorCacheField; + Cache, Constructor> tmp; + try { + constructorCacheField = ReflectionUtil.class.getDeclaredField("CONSTRUCTOR_CACHE"); + if (constructorCacheField != null) { + constructorCacheField.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(constructorCacheField, constructorCacheField.getModifiers() & ~Modifier.FINAL); + tmp = CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.SECONDS).concurrencyLevel(64).weakKeys().weakValues().build(); + constructorCacheField.set(tmp.getClass(), tmp); + } + } catch (Exception e) { + System.out.println("Error when setting the CONSTRUCTOR_CACHE to expire: " + e); + } + } + + private Cache getReflectionUtilCache() { + Field constructorCacheField; + try { + constructorCacheField = ReflectionUtil.class.getDeclaredField("CONSTRUCTOR_CACHE"); + if (constructorCacheField != null) { + constructorCacheField.setAccessible(true); + return (Cache) constructorCacheField.get(null); + } + } catch (Exception e) { + System.out.println("Error when getting the CONSTRUCTOR_CACHE var: " + e); + } + return null; + } + + private void invalidateReflectionUtlCache() { + try { + Cache constructorCache = getReflectionUtilCache(); + if ( constructorCache != null ) { + constructorCache.invalidateAll(); + } + } catch (Exception e) { + System.out.println("Error when trying to invalidate the cache: " + e); + } + } + + private void cleanUpReflectionUtlCache() { + try { + Cache constructorCache = getReflectionUtilCache(); + if ( constructorCache != null ) { + constructorCache.cleanUp(); + } + } catch (Exception e) { + System.out.println("Error when trying to cleanUp the cache: " + e); + } + } + + private long getReflectionUtilCacheSize() { + try { + Cache constructorCache = getReflectionUtilCache(); + if ( constructorCache != null ) { + return constructorCache.size(); + } + } catch (Exception e) { + System.out.println(e); + } + return -1; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java index 3b54b49..891514b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java @@ -47,7 +47,7 @@ import org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hive.common.util.ReflectionUtil; import java.io.IOException; import java.util.Collections; @@ -125,7 +125,7 @@ public FunctionInfo registerFunction( case GENERIC_UDAF_RESOLVER: return registerGenericUDAF( functionName, (GenericUDAFResolver) - ReflectionUtils.newInstance(udfClass, null), resources); + ReflectionUtil.newInstance(udfClass, null), resources); case TABLE_FUNCTION_RESOLVER: // native or not would be decided by annotation. need to evaluate that first return registerTableFunction(functionName, @@ -154,7 +154,7 @@ public FunctionInfo registerGenericUDF(String functionName, Class genericUDFClass, FunctionResource... resources) { validateClass(genericUDFClass, GenericUDF.class); FunctionInfo fI = new FunctionInfo(isNative, functionName, - ReflectionUtils.newInstance(genericUDFClass, null), resources); + ReflectionUtil.newInstance(genericUDFClass, null), resources); addFunction(functionName, fI); return fI; } @@ -179,7 +179,7 @@ public FunctionInfo registerGenericUDTF(String functionName, Class genericUDTFClass, FunctionResource... resources) { validateClass(genericUDTFClass, GenericUDTF.class); FunctionInfo fI = new FunctionInfo(isNative, functionName, - ReflectionUtils.newInstance(genericUDTFClass, null), resources); + ReflectionUtil.newInstance(genericUDTFClass, null), resources); addFunction(functionName, fI); return fI; } @@ -197,7 +197,7 @@ public FunctionInfo registerUDAF(String functionName, Class udafClass, FunctionResource... resources) { validateClass(udafClass, UDAF.class); FunctionInfo function = new WindowFunctionInfo(isNative, functionName, - new GenericUDAFBridge(ReflectionUtils.newInstance(udafClass, null)), resources); + new GenericUDAFBridge(ReflectionUtil.newInstance(udafClass, null)), resources); addFunction(functionName, function); addFunction(WINDOW_FUNC_PREFIX + functionName, function); return function;