diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 3b365c04a2..64c72278a6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -259,37 +259,51 @@ public URI apply(Path path) { dag.addURIsForCredentials(uris); } + getCredentialsForFileSinks(mapWork, dag); + } + + private void addCredentials(ReduceWork reduceWork, DAG dag) { + getCredentialsForFileSinks(reduceWork, dag); + } + + private void addCredentials(MergeJoinWork mergeJoinWork, DAG dag) { + getCredentialsForFileSinks(mergeJoinWork, dag); + } + + private void getCredentialsForFileSinks(BaseWork baseWork, DAG dag) { Set fileSinkUris = new HashSet(); - List topNodes = new ArrayList(); - Map> aliasToWork = mapWork.getAliasToWork(); - for (Operator operator : aliasToWork.values()) { - topNodes.add(operator); - } + List topNodes = getTopNodes(baseWork); + + LOG.debug("Collecting file sink uris for {} topnodes: {}", baseWork.getClass(), topNodes); collectFileSinkUris(topNodes, fileSinkUris); if (LOG.isDebugEnabled()) { for (URI fileSinkUri: fileSinkUris) { - LOG.debug("Marking MapWork output URI as needing credentials: " + fileSinkUri); + LOG.debug("Marking {} output URI as needing credentials (filesink): {}", baseWork.getClass(), fileSinkUri); } } dag.addURIsForCredentials(fileSinkUris); } - private void addCredentials(ReduceWork reduceWork, DAG dag) { - - Set fileSinkUris = new HashSet(); - + private List getTopNodes(BaseWork work) { List topNodes = new ArrayList(); - topNodes.add(reduceWork.getReducer()); - collectFileSinkUris(topNodes, fileSinkUris); - if (LOG.isDebugEnabled()) { - for (URI fileSinkUri: fileSinkUris) { - LOG.debug("Marking ReduceWork output URI as needing credentials: " + fileSinkUri); + if (work instanceof MapWork) { + Map> aliasToWork = ((MapWork) work).getAliasToWork(); + for (Operator operator : aliasToWork.values()) { + topNodes.add(operator); + } + } else if (work instanceof ReduceWork) { + topNodes.add(((ReduceWork) work).getReducer()); + } else if (work instanceof MergeJoinWork) { + for (Operator operator : ((MergeJoinWork) work) + .getAllRootOperators()) { + topNodes.add(operator); } } - dag.addURIsForCredentials(fileSinkUris); + + return topNodes; } /* @@ -1510,6 +1524,8 @@ public void addCredentials(BaseWork work, DAG dag) throws IOException { addCredentials((MapWork) work, dag); } else if (work instanceof ReduceWork) { addCredentials((ReduceWork) work, dag); + } else if (work instanceof MergeJoinWork) { + addCredentials((MergeJoinWork) work, dag); } }