diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 428e0ff..e9ec9b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -55,6 +55,7 @@ import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.SessionNotRunning; +import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.client.DAGClient; @@ -124,14 +125,9 @@ public int execute(DriverContext driverContext) { // create the tez tmp dir scratchDir = utils.createTezDir(scratchDir, conf); - if (!session.isOpen()) { - // can happen if the user sets the tez flag after the session was - // established - LOG.info("Tez session hasn't been created yet. Opening session"); - session.open(conf, inputOutputJars); - } else { - session.refreshLocalResourcesFromConf(conf); - } + Map inputOutputLocalResources = null; + + updateSession(session, jobConf, scratchDir, inputOutputJars); List additionalLr = session.getLocalizedResources(); @@ -153,6 +149,12 @@ public int execute(DriverContext driverContext) { // next we translate the TezWork to a Tez DAG DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx); + // Extra resources from the StorageHandler must be added to the DAG + // This is done automatically for new TezClients, but must be done + // if we added additional files to an already open session. + addExtraResourcesToDag(session, dag, jobConf, scratchDir.toString(), + inputOutputJars, inputOutputLocalResources); + // submit will send the job to the cluster and start executing client = submit(jobConf, dag, scratchDir, appJarLr, session, additionalLr); @@ -195,6 +197,80 @@ public int execute(DriverContext driverContext) { return rc; } + /** + * Ensures that the Tez Session is open and the AM + * has all necessary jars configured. + */ + Map updateSession(TezSessionState session, + JobConf jobConf, Path scratchDir, String[] inputOutputJars) throws Exception { + final boolean missingLocalResources = !session + .hasResources(inputOutputJars); + Map inputOutputLocalResources = null; + + if (!session.isOpen()) { + // can happen if the user sets the tez flag after the session was + // established + LOG.info("Tez session hasn't been created yet. Opening session"); + session.open(conf, inputOutputJars); + } else { + LOG.info("Session is already open"); + + // Ensure the open session has the necessary resources (StorageHandler) + if (missingLocalResources) { + LOG.info("Tez session missing resources," + + " adding additional necessary resources"); + // Localize the jars + List localResources = utils.localizeTempFiles( + scratchDir.toString(), jobConf, inputOutputJars); + if (null != localResources) { + inputOutputLocalResources = new HashMap(); + for (LocalResource lr : localResources) { + inputOutputLocalResources.put(utils.getBaseName(lr), lr); + } + // Add them to the AM + session.getSession().addAppMasterLocalFiles(inputOutputLocalResources); + } + } + + session.refreshLocalResourcesFromConf(conf); + } + + return inputOutputLocalResources; + } + + /** + * Adds any necessary resources that must be localized in each + * vertex to the DAG. + */ + void addExtraResourcesToDag(TezSessionState session, DAG dag, + JobConf jobConf, String scratchDir, String[] inputOutputJars, + Map inputOutputLocalResources) throws Exception { + if (!session.hasResources(inputOutputJars)) { + // Double check to make sure that we initialize the resources + if (null == inputOutputLocalResources) { + List localResources = utils.localizeTempFiles(scratchDir, jobConf, inputOutputJars); + if (null != localResources) { + inputOutputLocalResources = new HashMap(); + for (LocalResource lr : localResources) { + inputOutputLocalResources.put(utils.getBaseName(lr), lr); + } + } + } + + if (null != inputOutputLocalResources) { + try { + dag.addTaskLocalFiles(inputOutputLocalResources); + } catch (TezUncheckedException e) { + // TezCommonUtils.addAdditionalLocalResources will throw an exception + // if any of the resources were already added, but we have no means to check + // it before trying to add the resources. If they have one of the resources + // hopefully it has all of them.. + LOG.debug("Ignored exception when adding extra local resources to DAG", e); + } + } + } + } + DAG build(JobConf conf, TezWork work, Path scratchDir, LocalResource appJarLr, List additionalLr, Context ctx) throws Exception { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java index 456b5eb..5f827eb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java @@ -19,12 +19,13 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -305,15 +306,21 @@ public int compareTo(Dependency o) { work.configureJobConf(jobConf); } String[] newTmpJars = jobConf.getStrings(MR_JAR_PROPERTY); - if (oldTmpJars != null && (oldTmpJars.length != 0)) { - if (newTmpJars != null && (newTmpJars.length != 0)) { - String[] combinedTmpJars = new String[newTmpJars.length + oldTmpJars.length]; - System.arraycopy(oldTmpJars, 0, combinedTmpJars, 0, oldTmpJars.length); - System.arraycopy(newTmpJars, 0, combinedTmpJars, oldTmpJars.length, newTmpJars.length); - jobConf.setStrings(MR_JAR_PROPERTY, combinedTmpJars); - } else { - jobConf.setStrings(MR_JAR_PROPERTY, oldTmpJars); + if (oldTmpJars != null || newTmpJars != null) { + int oldLength = oldTmpJars == null ? 0 : oldTmpJars.length, + newLength = newTmpJars == null ? 0 : newTmpJars.length; + String[] combinedTmpJars = new String[oldLength + newLength]; + if (oldLength > 0) { + System.arraycopy(oldTmpJars, 0, combinedTmpJars, 0, oldLength); + } + if (newLength > 0) { + System.arraycopy(newTmpJars, 0, combinedTmpJars, oldLength, newLength); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Updating tmpjars to " + Arrays.toString(combinedTmpJars)); } + jobConf.setStrings(MR_JAR_PROPERTY, combinedTmpJars); + return combinedTmpJars; } return newTmpJars; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 45ab672..514d5a4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -30,9 +30,12 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -216,4 +219,39 @@ public void testClose() throws HiveException { task.close(work, 0); verify(op, times(4)).jobClose(any(Configuration.class), eq(true)); } + + @Test + public void testExistingSessionGetsStorageHandlerResources() throws Exception { + final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"}; + LocalResource res = mock(LocalResource.class); + final List resources = Collections.singletonList(res); + final Map resMap = new HashMap(); + resMap.put("foo.jar", res); + + when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars)) + .thenReturn(resources); + when(utils.getBaseName(res)).thenReturn("foo.jar"); + when(sessionState.isOpen()).thenReturn(true); + when(sessionState.hasResources(inputOutputJars)).thenReturn(false); + task.updateSession(sessionState, conf, path, inputOutputJars); + verify(session).addAppMasterLocalFiles(resMap); + } + + @Test + public void testExtraResourcesAddedToDag() throws Exception { + final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"}; + LocalResource res = mock(LocalResource.class); + final List resources = Collections.singletonList(res); + final Map resMap = new HashMap(); + resMap.put("foo.jar", res); + DAG dag = mock(DAG.class); + + when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars)) + .thenReturn(resources); + when(utils.getBaseName(res)).thenReturn("foo.jar"); + when(sessionState.isOpen()).thenReturn(true); + when(sessionState.hasResources(inputOutputJars)).thenReturn(false); + task.addExtraResourcesToDag(sessionState, dag, conf, path.toString(), inputOutputJars, resMap); + verify(dag).addTaskLocalFiles(resMap); + } }