diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 7524d7ebd0..94ef91232f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -106,6 +106,7 @@ import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; +import org.apache.hadoop.hive.ql.plan.UnionWork; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsFactory; @@ -238,58 +239,75 @@ private void collectFileSinkUris(List topNodes, Set uris) { } } - private void addCredentials(MapWork mapWork, DAG dag) { - Set paths = mapWork.getPathToAliases().keySet(); - if (!paths.isEmpty()) { - Iterator pathIterator = Iterators.transform(paths.iterator(), new Function() { - @Override - public URI apply(Path path) { - return path.toUri(); - } - }); - Set uris = new HashSet(); - Iterators.addAll(uris, pathIterator); + /** + * Set up credentials for the base work on secure clusters + */ + public void addCredentials(BaseWork work, DAG dag) { + if (work instanceof MapWork){ + Set paths = ((MapWork)work).getPathToAliases().keySet(); + if (!paths.isEmpty()) { + Iterator pathIterator = Iterators.transform(paths.iterator(), new Function() { + @Override + public URI apply(Path path) { + return path.toUri(); + } + }); + + Set uris = new HashSet(); + Iterators.addAll(uris, pathIterator); - if (LOG.isDebugEnabled()) { - for (URI uri: uris) { - LOG.debug("Marking MapWork input URI as needing credentials: " + uri); + if (LOG.isDebugEnabled()) { + for (URI uri: uris) { + LOG.debug("Marking MapWork input URI as needing credentials: " + uri); + } } + dag.addURIsForCredentials(uris); } - dag.addURIsForCredentials(uris); } + getCredentialsForFileSinks(work, dag); + } + + private void getCredentialsForFileSinks(BaseWork baseWork, DAG dag) { Set fileSinkUris = new HashSet(); - List topNodes = new ArrayList(); - Map> aliasToWork = mapWork.getAliasToWork(); - for (Operator operator : aliasToWork.values()) { - topNodes.add(operator); - } + List topNodes = getTopNodes(baseWork); + + LOG.debug("Collecting file sink uris for {} topnodes: {}", baseWork.getClass(), topNodes); collectFileSinkUris(topNodes, fileSinkUris); if (LOG.isDebugEnabled()) { - for (URI fileSinkUri: fileSinkUris) { - LOG.debug("Marking MapWork output URI as needing credentials: " + fileSinkUri); + for (URI fileSinkUri : fileSinkUris) { + LOG.debug("Marking {} output URI as needing credentials (filesink): {}", + baseWork.getClass(), fileSinkUri); } } dag.addURIsForCredentials(fileSinkUris); } - private void addCredentials(ReduceWork reduceWork, DAG dag) { - - Set fileSinkUris = new HashSet(); - + private List getTopNodes(BaseWork work) { List topNodes = new ArrayList(); - topNodes.add(reduceWork.getReducer()); - collectFileSinkUris(topNodes, fileSinkUris); - if (LOG.isDebugEnabled()) { - for (URI fileSinkUri: fileSinkUris) { - LOG.debug("Marking ReduceWork output URI as needing credentials: " + fileSinkUri); + if (work instanceof MapWork) { + Map> aliasToWork = ((MapWork) work).getAliasToWork(); + for (Operator operator : aliasToWork.values()) { + topNodes.add(operator); + } + } else if (work instanceof ReduceWork) { + topNodes.add(((ReduceWork) work).getReducer()); + } else if (work instanceof MergeJoinWork) { + for (Operator operator : ((MergeJoinWork) work) + .getAllRootOperators()) { + topNodes.add(operator); + } + } else if (work instanceof UnionWork) { + for (Operator operator : ((UnionWork) work).getAllRootOperators()) { + topNodes.add(operator); } } - dag.addURIsForCredentials(fileSinkUris); + + return topNodes; } /* @@ -1501,18 +1519,6 @@ public Vertex createVertex(JobConf conf, BaseWork work, return v; } - /** - * Set up credentials for the base work on secure clusters - */ - public void addCredentials(BaseWork work, DAG dag) throws IOException { - dag.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials()); - if (work instanceof MapWork) { - addCredentials((MapWork) work, dag); - } else if (work instanceof ReduceWork) { - addCredentials((ReduceWork) work, dag); - } - } - /** * createTezDir creates a temporary directory in the scratchDir folder to * be used with Tez. Assumes scratchDir exists. diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java index a446a95972..f65484bf04 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java @@ -18,18 +18,14 @@ package org.apache.hadoop.hive.ql.plan; -import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; -import java.util.List; +import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.HashSet; -import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.mapred.JobConf; /**