diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 0d0ac41..7487253 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -270,11 +270,16 @@ public TezSessionState getSession(TezSessionState session, HiveConf conf, public void closeAndOpen(TezSessionState sessionState, HiveConf conf) throws Exception { + closeAndOpen(sessionState, conf, null); + } + + public void closeAndOpen(TezSessionState sessionState, HiveConf conf, + String[] additionalFiles) throws Exception { HiveConf sessionConf = sessionState.getConf(); if (sessionConf != null && sessionConf.get("tez.queue.name") != null) { conf.set("tez.queue.name", sessionConf.get("tez.queue.name")); } close(sessionState); - sessionState.open(conf); + sessionState.open(conf, additionalFiles); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 428e0ff..51df835 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -55,6 +55,7 @@ import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.SessionNotRunning; +import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.client.DAGClient; @@ -124,14 +125,11 @@ public int execute(DriverContext driverContext) { // create the tez tmp dir scratchDir = utils.createTezDir(scratchDir, conf); - if (!session.isOpen()) { - // can happen if the user sets the tez flag after the session was - // established - LOG.info("Tez session hasn't been created yet. Opening session"); - session.open(conf, inputOutputJars); - } else { - session.refreshLocalResourcesFromConf(conf); - } + Map inputOutputLocalResources = + getExtraLocalResources(jobConf, scratchDir, inputOutputJars); + + // Ensure the session is open and has the necessary local resources + updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources); List additionalLr = session.getLocalizedResources(); @@ -153,8 +151,12 @@ public int execute(DriverContext driverContext) { // next we translate the TezWork to a Tez DAG DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx); + // Add the extra resources to the dag + addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources); + // submit will send the job to the cluster and start executing - client = submit(jobConf, dag, scratchDir, appJarLr, session, additionalLr); + client = submit(jobConf, dag, scratchDir, appJarLr, session, + additionalLr, inputOutputJars, inputOutputLocalResources); // finally monitor will print progress until the job is done TezJobMonitor monitor = new TezJobMonitor(); @@ -195,6 +197,63 @@ public int execute(DriverContext driverContext) { return rc; } + /** + * Converted the list of jars into local resources + */ + Map getExtraLocalResources(JobConf jobConf, Path scratchDir, + String[] inputOutputJars) throws Exception { + final Map resources = new HashMap(); + final List localResources = utils.localizeTempFiles( + scratchDir.toString(), jobConf, inputOutputJars); + if (null != localResources) { + for (LocalResource lr : localResources) { + resources.put(utils.getBaseName(lr), lr); + } + } + return resources; + } + + /** + * Ensures that the Tez Session is open and the AM has all necessary jars configured. + */ + void updateSession(TezSessionState session, + JobConf jobConf, Path scratchDir, String[] inputOutputJars, + Map extraResources) throws Exception { + final boolean missingLocalResources = !session + .hasResources(inputOutputJars); + + if (!session.isOpen()) { + // can happen if the user sets the tez flag after the session was + // established + LOG.info("Tez session hasn't been created yet. Opening session"); + session.open(conf, inputOutputJars); + } else { + LOG.info("Session is already open"); + + // Ensure the open session has the necessary resources (StorageHandler) + if (missingLocalResources) { + LOG.info("Tez session missing resources," + + " adding additional necessary resources"); + session.getSession().addAppMasterLocalFiles(extraResources); + } + + session.refreshLocalResourcesFromConf(conf); + } + } + + /** + * Adds any necessary resources that must be localized in each vertex to the DAG. + */ + void addExtraResourcesToDag(TezSessionState session, DAG dag, + String[] inputOutputJars, + Map inputOutputLocalResources) throws Exception { + if (!session.hasResources(inputOutputJars)) { + if (null != inputOutputLocalResources) { + dag.addTaskLocalFiles(inputOutputLocalResources); + } + } + } + DAG build(JobConf conf, TezWork work, Path scratchDir, LocalResource appJarLr, List additionalLr, Context ctx) throws Exception { @@ -287,7 +346,8 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, DAGClient submit(JobConf conf, DAG dag, Path scratchDir, LocalResource appJarLr, TezSessionState sessionState, - List additionalLr) + List additionalLr, String[] inputOutputJars, + Map inputOutputLocalResources) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); @@ -308,7 +368,7 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir, console.printInfo("Tez session was closed. Reopening..."); // close the old one, but keep the tmp files around - TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf); + TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf, inputOutputJars); console.printInfo("Session re-established."); dagClient = sessionState.getSession().submitDAG(dag); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java index 456b5eb..73721e1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java @@ -19,12 +19,13 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -305,15 +306,23 @@ public int compareTo(Dependency o) { work.configureJobConf(jobConf); } String[] newTmpJars = jobConf.getStrings(MR_JAR_PROPERTY); - if (oldTmpJars != null && (oldTmpJars.length != 0)) { - if (newTmpJars != null && (newTmpJars.length != 0)) { - String[] combinedTmpJars = new String[newTmpJars.length + oldTmpJars.length]; - System.arraycopy(oldTmpJars, 0, combinedTmpJars, 0, oldTmpJars.length); - System.arraycopy(newTmpJars, 0, combinedTmpJars, oldTmpJars.length, newTmpJars.length); - jobConf.setStrings(MR_JAR_PROPERTY, combinedTmpJars); + if (oldTmpJars != null || newTmpJars != null) { + String[] finalTmpJars; + if (oldTmpJars == null || oldTmpJars.length == 0) { + // Avoid a copy when oldTmpJars is null or empty + finalTmpJars = newTmpJars; + } else if (newTmpJars == null || newTmpJars.length == 0) { + // Avoid a copy when newTmpJars is null or empty + finalTmpJars = oldTmpJars; } else { - jobConf.setStrings(MR_JAR_PROPERTY, oldTmpJars); + // Both are non-empty, only copy now + finalTmpJars = new String[oldTmpJars.length + newTmpJars.length]; + System.arraycopy(oldTmpJars, 0, finalTmpJars, 0, oldTmpJars.length); + System.arraycopy(newTmpJars, 0, finalTmpJars, oldTmpJars.length, newTmpJars.length); } + + jobConf.setStrings(MR_JAR_PROPERTY, finalTmpJars); + return finalTmpJars; } return newTmpJars; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index ad5a6e7..c522687 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -26,6 +26,7 @@ import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.apache.hadoop.hive.conf.HiveConf; public class TestTezSessionPool { @@ -157,4 +158,29 @@ public void testReturn() { } } } + + @Test + public void testCloseAndOpenDefault() throws Exception { + poolManager = new TestTezSessionPoolManager(); + TezSessionState session = Mockito.mock(TezSessionState.class); + Mockito.when(session.isDefault()).thenReturn(false); + + poolManager.closeAndOpen(session, conf); + + Mockito.verify(session).close(false); + Mockito.verify(session).open(conf, null); + } + + @Test + public void testCloseAndOpenWithResources() throws Exception { + poolManager = new TestTezSessionPoolManager(); + TezSessionState session = Mockito.mock(TezSessionState.class); + Mockito.when(session.isDefault()).thenReturn(false); + String[] extraResources = new String[] { "file:///tmp/foo.jar" }; + + poolManager.closeAndOpen(session, conf, extraResources); + + Mockito.verify(session).close(false); + Mockito.verify(session).open(conf, extraResources); + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 45ab672..44b4464 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -30,9 +30,11 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -204,9 +206,10 @@ public void testEmptyWork() throws IllegalArgumentException, IOException, Except @Test public void testSubmit() throws Exception { DAG dag = DAG.create("test"); - task.submit(conf, dag, path, appLr, sessionState, new LinkedList()); + task.submit(conf, dag, path, appLr, sessionState, Collections. emptyList(), + new String[0], Collections. emptyMap()); // validate close/reopen - verify(sessionState, times(1)).open(any(HiveConf.class)); + verify(sessionState, times(1)).open(any(HiveConf.class), any(String[].class)); verify(sessionState, times(1)).close(eq(false)); // now uses pool after HIVE-7043 verify(session, times(2)).submitDAG(any(DAG.class)); } @@ -216,4 +219,54 @@ public void testClose() throws HiveException { task.close(work, 0); verify(op, times(4)).jobClose(any(Configuration.class), eq(true)); } + + @Test + public void testExistingSessionGetsStorageHandlerResources() throws Exception { + final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"}; + LocalResource res = mock(LocalResource.class); + final List resources = Collections.singletonList(res); + final Map resMap = new HashMap(); + resMap.put("foo.jar", res); + + when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars)) + .thenReturn(resources); + when(utils.getBaseName(res)).thenReturn("foo.jar"); + when(sessionState.isOpen()).thenReturn(true); + when(sessionState.hasResources(inputOutputJars)).thenReturn(false); + task.updateSession(sessionState, conf, path, inputOutputJars, resMap); + verify(session).addAppMasterLocalFiles(resMap); + } + + @Test + public void testExtraResourcesAddedToDag() throws Exception { + final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"}; + LocalResource res = mock(LocalResource.class); + final List resources = Collections.singletonList(res); + final Map resMap = new HashMap(); + resMap.put("foo.jar", res); + DAG dag = mock(DAG.class); + + when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars)) + .thenReturn(resources); + when(utils.getBaseName(res)).thenReturn("foo.jar"); + when(sessionState.isOpen()).thenReturn(true); + when(sessionState.hasResources(inputOutputJars)).thenReturn(false); + task.addExtraResourcesToDag(sessionState, dag, inputOutputJars, resMap); + verify(dag).addTaskLocalFiles(resMap); + } + + @Test + public void testGetExtraLocalResources() throws Exception { + final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"}; + LocalResource res = mock(LocalResource.class); + final List resources = Collections.singletonList(res); + final Map resMap = new HashMap(); + resMap.put("foo.jar", res); + + when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars)) + .thenReturn(resources); + when(utils.getBaseName(res)).thenReturn("foo.jar"); + + assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars)); + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java index 2600149..2344279 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java @@ -23,11 +23,16 @@ import junit.framework.Assert; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; +import org.apache.hadoop.mapred.JobConf; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestTezWork { + private static final String MR_JAR_PROPERTY = "tmpjars"; private List nodes; private TezWork work; @@ -156,4 +161,75 @@ public void testGetAllWork() throws Exception { Assert.assertEquals(sorted.get(i), nodes.get(4-i)); } } + + @Test + public void testConfigureJars() throws Exception { + final JobConf conf = new JobConf(); + conf.set(MR_JAR_PROPERTY, "file:///tmp/foo1.jar"); + BaseWork baseWork = Mockito.mock(BaseWork.class); + Mockito.doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + conf.set(MR_JAR_PROPERTY, "file:///tmp/foo2.jar"); + return null; + } + + }).when(baseWork).configureJobConf(conf); + + work.add(baseWork); + work.configureJobConfAndExtractJars(conf); + Assert.assertEquals("file:///tmp/foo1.jar,file:///tmp/foo2.jar", conf.get(MR_JAR_PROPERTY)); + } + + @Test + public void testConfigureJarsNoExtraJars() throws Exception { + final JobConf conf = new JobConf(); + conf.set(MR_JAR_PROPERTY, "file:///tmp/foo1.jar"); + BaseWork baseWork = Mockito.mock(BaseWork.class); + + work.add(baseWork); + work.configureJobConfAndExtractJars(conf); + Assert.assertEquals("file:///tmp/foo1.jar", conf.get(MR_JAR_PROPERTY)); + } + + @Test + public void testConfigureJarsWithNull() throws Exception { + final JobConf conf = new JobConf(); + conf.set(MR_JAR_PROPERTY, "file:///tmp/foo1.jar"); + BaseWork baseWork = Mockito.mock(BaseWork.class); + Mockito.doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + conf.unset(MR_JAR_PROPERTY); + return null; + } + + }).when(baseWork).configureJobConf(conf); + + work.add(baseWork); + work.configureJobConfAndExtractJars(conf); + Assert.assertEquals("file:///tmp/foo1.jar", conf.get(MR_JAR_PROPERTY)); + } + + @Test + public void testConfigureJarsStartingWithNull() throws Exception { + final JobConf conf = new JobConf(); + conf.unset(MR_JAR_PROPERTY); + BaseWork baseWork = Mockito.mock(BaseWork.class); + Mockito.doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + conf.setStrings(MR_JAR_PROPERTY, "file:///tmp/foo1.jar", "file:///tmp/foo2.jar"); + return null; + } + + }).when(baseWork).configureJobConf(conf); + + work.add(baseWork); + work.configureJobConfAndExtractJars(conf); + Assert.assertEquals("file:///tmp/foo1.jar,file:///tmp/foo2.jar", conf.get(MR_JAR_PROPERTY)); + } }