diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 2dc2066..77c0c46 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -17,18 +17,27 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import com.google.common.base.Function; +import com.google.common.collect.Iterators; + import java.io.FileNotFoundException; import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import javax.security.auth.login.LoginException; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -67,6 +76,7 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; +import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; @@ -96,9 +106,36 @@ */ public class DagUtils { + private static final Log LOG = LogFactory.getLog(DagUtils.class.getName()); private static final String TEZ_DIR = "_tez_scratch_dir"; private static DagUtils instance; + private void addCredentials(MapWork mapWork, DAG dag) { + Set paths = mapWork.getPathToAliases().keySet(); + if (paths != null && !paths.isEmpty()) { + Iterator pathIterator = Iterators.transform(paths.iterator(), new Function() { + @Override + public URI apply(String input) { + return new Path(input).toUri(); + } + }); + + Set uris = new HashSet(); + Iterators.addAll(uris, pathIterator); + + if (LOG.isDebugEnabled()) { + for (URI uri: uris) { + LOG.debug("Marking URI as needing credentials: "+uri); + } + } + dag.addURIsForCredentials(uris); + } + } + + private void addCredentials(ReduceWork reduceWork, DAG dag) { + // nothing at the moment + } + /* * Creates the configuration object necessary to run a specific vertex from * map work. This includes input formats, input processor, etc. @@ -651,6 +688,17 @@ public Vertex createVertex(JobConf conf, BaseWork work, } /** + * Set up credentials for the base work on secure clusters + */ + public void addCredentials(BaseWork work, DAG dag) { + if (work instanceof MapWork) { + addCredentials((MapWork) work, dag); + } else if (work instanceof ReduceWork) { + addCredentials((ReduceWork) work, dag); + } + } + + /** * createTezDir creates a temporary directory in the scratchDir folder to * be used with Tez. Assumes scratchDir exists. */ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 22a21c9..792e9a4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -210,6 +210,7 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, Vertex wx = utils.createVertex(wxConf, w, tezDir, appJarLr, additionalLr, fs, ctx, !isFinal); dag.addVertex(wx); + utils.addCredentials(w, dag); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName()); workToVertex.put(w, wx); workToConf.put(w, wxConf);