From da8c3d1263446a386e3a251f342bc17ff606f791 Mon Sep 17 00:00:00 2001 From: Gopal V Date: Wed, 16 Mar 2016 14:53:44 -0700 Subject: [PATCH] Use new configs to enable/disable permanent UDFs add udfs at the end of the classpath add llap daemon conf --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 7 +- llap-server/bin/runLlapDaemon.sh | 2 +- .../hadoop/hive/llap/cli/LlapServiceDriver.java | 77 ++++++++++++++++++++++ .../hadoop/hive/llap/daemon/impl/LlapDaemon.java | 14 ++-- .../impl/StaticPermanentFunctionChecker.java | 70 ++++++++++++++++++++ .../hive/ql/optimizer/physical/LlapDecider.java | 2 +- 6 files changed, 162 insertions(+), 10 deletions(-) create mode 100644 llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StaticPermanentFunctionChecker.java diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 98c6372..57f0668 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -323,7 +323,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal llapDaemonVarsSetLocal.add(ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL.varname); - llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_ALLOW_PERMANENT_FNS.varname); + llapDaemonVarsSetLocal.add(ConfVars.LLAP_ALLOW_PERMANENT_FNS.varname); + llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_DOWNLOAD_PERMANENT_FNS.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION.varname); @@ -2569,6 +2570,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Whether to skip the compile-time check for non-built-in UDFs when deciding whether to\n" + "execute tasks in LLAP. Skipping the check allows executing UDFs from pre-localized\n" + "jars in LLAP; if the jars are not pre-localized, the UDFs will simply fail to load."), + LLAP_ALLOW_PERMANENT_FNS("hive.llap.allow.permanent.fns", true, + "Whether LLAP decider should allow permanent UDFs."), LLAP_EXECUTION_MODE("hive.llap.execution.mode", "none", new StringSet("auto", "none", "all", "map"), "Chooses whether query fragments will run in container or in llap"), @@ -2664,7 +2667,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_DAEMON_COMMUNICATOR_NUM_THREADS("hive.llap.daemon.communicator.num.threads", 10, "Number of threads to use in LLAP task communicator in Tez AM.", "llap.daemon.communicator.num.threads"), - LLAP_DAEMON_ALLOW_PERMANENT_FNS("hive.llap.daemon.allow.permanent.fns", false, + LLAP_DAEMON_DOWNLOAD_PERMANENT_FNS("hive.llap.daemon.download.permanent.fns", false, "Whether LLAP daemon should localize the resources for permanent UDFs."), LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS( "hive.llap.task.scheduler.node.reenable.min.timeout.ms", "200ms", diff --git llap-server/bin/runLlapDaemon.sh llap-server/bin/runLlapDaemon.sh index 445a41e..09c3813 100755 --- llap-server/bin/runLlapDaemon.sh +++ llap-server/bin/runLlapDaemon.sh @@ -82,7 +82,7 @@ if [ ! -n "${LLAP_DAEMON_LOG_LEVEL}" ]; then LLAP_DAEMON_LOG_LEVEL=${LOG_LEVEL_DEFAULT} fi -CLASSPATH=${LLAP_DAEMON_CONF_DIR}:${LLAP_DAEMON_HOME}/lib/*:${LLAP_DAEMON_HOME}/lib/tez/*:`${HADOOP_PREFIX}/bin/hadoop classpath`:. +CLASSPATH=${LLAP_DAEMON_CONF_DIR}:${LLAP_DAEMON_HOME}/lib/*:${LLAP_DAEMON_HOME}/lib/tez/*:`${HADOOP_PREFIX}/bin/hadoop classpath`:${LLAP_DAEMON_HOME}/lib/udfs/*:. if [ -n "LLAP_DAEMON_USER_CLASSPATH" ]; then CLASSPATH=${CLASSPATH}:${LLAP_DAEMON_USER_CLASSPATH} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java index 508ce27..97d7a96 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java @@ -20,14 +20,25 @@ import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.net.URI; +import java.net.URISyntaxException; import java.net.URL; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Map.Entry; import java.util.Collection; +import java.util.List; import java.util.Properties; +import java.util.Set; import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon; +import org.apache.hadoop.hive.llap.daemon.impl.StaticPermanentFunctionChecker; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; import org.apache.tez.dag.api.TezConfiguration; @@ -43,9 +54,18 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.cli.LlapOptionsProcessor.LlapOptions; import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.ResourceUri; +import org.apache.hadoop.hive.ql.exec.FunctionTask; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.FunctionInfo.FunctionResource; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.ResourceType; +import org.apache.hadoop.hive.ql.util.ResourceDownloader; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; @@ -256,6 +276,7 @@ private void run(String[] args) throws Exception { Path libDir = new Path(tmpDir, "lib"); Path tezDir = new Path(libDir, "tez"); + Path udfDir = new Path(libDir, "udfs"); String tezLibs = conf.get(TezConfiguration.TEZ_LIB_URIS); if (tezLibs == null) { @@ -329,6 +350,15 @@ private void run(String[] args) throws Exception { } } + // UDFs + final Set allowedUdfs; + + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOW_PERMANENT_FNS)) { + allowedUdfs = downloadPermanentFunctions(conf, udfDir); + } else { + allowedUdfs = Collections.emptySet(); + } + String java_home; if (options.getJavaPath() == null || options.getJavaPath().isEmpty()) { java_home = System.getenv("JAVA_HOME"); @@ -369,6 +399,14 @@ private void run(String[] args) throws Exception { IOUtils.copyBytes(loggerContent, lfs.create(new Path(confPath, "llap-daemon-log4j2.properties"), true), conf, true); + PrintWriter udfStream = + new PrintWriter(lfs.create(new Path(confPath, StaticPermanentFunctionChecker.PERMANENT_FUNCTIONS_LIST))); + for (String udfClass : allowedUdfs) { + udfStream.println(udfClass); + } + + udfStream.close(); + // extract configs for processing by the python fragments in Slider JSONObject configs = new JSONObject(); @@ -419,6 +457,45 @@ private void run(String[] args) throws Exception { } } + private Set downloadPermanentFunctions(Configuration conf, Path udfDir) throws HiveException, + URISyntaxException, IOException { + Map udfs = new HashMap(); + Hive hive = Hive.get(false); + // We allow embedded metastore inside + try { + hive.getMSC(); + } catch (MetaException e) { + throw new HiveException(e); + } + ResourceDownloader resourceDownloader = + new ResourceDownloader(conf, udfDir.toUri().normalize().getPath()); + List fns = hive.getAllFunctions(); + Set srcUris = new HashSet<>(); + for (Function fn : fns) { + String fqfn = fn.getDbName() + "." + fn.getFunctionName(); + if (udfs.containsKey(fn.getClassName())) { + LOG.warn("Duplicate function names found for " + fn.getClassName() + " with " + fqfn + + " and " + udfs.get(fn.getClassName())); + } + udfs.put(fn.getClassName(), fqfn); + List resources = fn.getResourceUris(); + if (resources == null || resources.isEmpty()) { + LOG.warn("Missing resources for " + fqfn); + continue; + } + for (ResourceUri resource : resources) { + srcUris.add(ResourceDownloader.createURI(resource.getUri())); + } + } + for (URI srcUri : srcUris) { + List localUris = resourceDownloader.downloadExternal(srcUri, null, false); + for(URI dst : localUris) { + LOG.warn("Downloaded " + dst + " from " + srcUri); + } + } + return udfs.keySet(); + } + private void localizeJarForClass(FileSystem lfs, Path libDir, String className, boolean doThrow) throws IOException { String jarPath = null; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 165830c..ff9ce59 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge.UdfWhitelistChecker; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ExitUtil; @@ -168,7 +169,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor // Initialize the function localizer. ClassLoader executorClassLoader = null; - if (HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_DAEMON_ALLOW_PERMANENT_FNS)) { + if (HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_DAEMON_DOWNLOAD_PERMANENT_FNS)) { this.fnLocalizer = new FunctionLocalizer(daemonConf, localDirs[0]); executorClassLoader = fnLocalizer.getClassLoader(); // Set up the hook that will disallow creating non-whitelisted UDFs anywhere in the plan. @@ -177,6 +178,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor SerializationUtilities.setGlobalHook(new LlapGlobalUdfChecker(fnLocalizer)); } else { this.fnLocalizer = null; + SerializationUtilities.setGlobalHook(new LlapGlobalUdfChecker(new StaticPermanentFunctionChecker(daemonConf))); executorClassLoader = Thread.currentThread().getContextClassLoader(); } @@ -449,9 +451,9 @@ public long getMaxJvmMemory() { * us into GenericUDFBridge-s, to check with the whitelist before instantiating a UDF. */ private static final class LlapGlobalUdfChecker extends SerializationUtilities.Hook { - private FunctionLocalizer fnLocalizer; - public LlapGlobalUdfChecker(FunctionLocalizer fnLocalizer) { - this.fnLocalizer = fnLocalizer; + private UdfWhitelistChecker fnCheckerImpl; + public LlapGlobalUdfChecker(UdfWhitelistChecker fnCheckerImpl) { + this.fnCheckerImpl = fnCheckerImpl; } @Override @@ -460,7 +462,7 @@ public boolean preRead(Class type) { // 2) Ignore GenericUDFBridge, it's checked separately in LlapUdfBridgeChecker. if (GenericUDFBridge.class == type) return true; // Run post-hook. if (!(GenericUDF.class.isAssignableFrom(type) || UDF.class.isAssignableFrom(type)) - || fnLocalizer.isUdfAllowed(type)) return false; + || fnCheckerImpl.isUdfAllowed(type)) return false; throw new SecurityException("UDF " + type.getCanonicalName() + " is not allowed"); } @@ -469,7 +471,7 @@ public Object postRead(Object o) { if (o == null) return o; Class type = o.getClass(); if (GenericUDFBridge.class == type) { - ((GenericUDFBridge)o).setUdfChecker(fnLocalizer); + ((GenericUDFBridge)o).setUdfChecker(fnCheckerImpl); } // This won't usually be called otherwise. preRead(type); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StaticPermanentFunctionChecker.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StaticPermanentFunctionChecker.java new file mode 100644 index 0000000..15968fa --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StaticPermanentFunctionChecker.java @@ -0,0 +1,70 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.impl; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.net.URL; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge.UdfWhitelistChecker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.sun.jdi.InvocationException; + +public class StaticPermanentFunctionChecker implements UdfWhitelistChecker { + private static final Logger LOG = LoggerFactory.getLogger(StaticPermanentFunctionChecker.class); + + public static final String PERMANENT_FUNCTIONS_LIST = "llap-udfs.lst"; + + private final IdentityHashMap, Boolean> allowedUdfClasses = new IdentityHashMap<>(); + + public StaticPermanentFunctionChecker(Configuration conf) { + URL logger = conf.getResource(PERMANENT_FUNCTIONS_LIST); + if (logger == null) { + LOG.warn("Could not find UDF whitelist in configuration: " + PERMANENT_FUNCTIONS_LIST); + return; + } + try { + BufferedReader r = new BufferedReader(new InputStreamReader(logger.openStream())); + String klassName = r.readLine(); + while (klassName != null) { + try { + Class clazz = Class.forName(klassName.trim(), false, this.getClass().getClassLoader()); + allowedUdfClasses.put(clazz, true); + // make a list before opening the RPC attack surface + } catch (ClassNotFoundException ie) { + // note: explicit format to use Throwable instead of var-args + LOG.warn("Could not load class " + klassName + " declared in UDF whitelist", ie); + } + klassName = r.readLine(); + } + } catch (IOException ioe) { + LOG.warn("Could not read UDF whitelist: " + PERMANENT_FUNCTIONS_LIST, ioe); + } + } + + @Override + public boolean isUdfAllowed(Class clazz) { + return FunctionRegistry.isBuiltInFuncClass(clazz) || allowedUdfClasses.containsKey(clazz); + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java index 194828f..737d9c3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java @@ -107,7 +107,7 @@ public LlapDecisionDispatcher(PhysicalContext pctx, LlapMode mode) { conf = pctx.getConf(); doSkipUdfCheck = HiveConf.getBoolVar(conf, ConfVars.LLAP_SKIP_COMPILE_UDF_CHECK); - arePermanentFnsAllowed = HiveConf.getBoolVar(conf, ConfVars.LLAP_DAEMON_ALLOW_PERMANENT_FNS); + arePermanentFnsAllowed = HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOW_PERMANENT_FNS); // Don't user uber in "all" mode - everything can go into LLAP, which is better than uber. shouldUber = HiveConf.getBoolVar(conf, ConfVars.LLAP_AUTO_ALLOW_UBER) && (mode != all); } -- 2.4.0