diff --git pom.xml pom.xml index b5a5697..457a178 100644 --- pom.xml +++ pom.xml @@ -144,7 +144,7 @@ 1.0.1 1.7.5 4.0.4 - 0.4.0-incubating + 0.5.0-incubating-SNAPSHOT 1.1 0.2 1.4 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java index 74c5429..2e036b9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java @@ -33,36 +33,36 @@ import com.google.common.collect.Multimap; -public class CustomPartitionEdge implements EdgeManager { +public class CustomPartitionEdge extends EdgeManager { private static final Log LOG = LogFactory.getLog(CustomPartitionEdge.class.getName()); CustomEdgeConfiguration conf = null; + EdgeManagerContext context = null; // used by the framework at runtime. initialize is the real initializer at runtime public CustomPartitionEdge() { } @Override - public int getNumDestinationTaskPhysicalInputs(int numSourceTasks, - int destinationTaskIndex) { + public int getNumDestinationTaskPhysicalInputs(int numSourceTasks) { return numSourceTasks; } @Override - public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, - int sourceTaskIndex) { + public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks) { return conf.getNumBuckets(); } @Override - public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestinationTasks) { - return numDestinationTasks; + public int getNumDestinationConsumerTasks(int sourceTaskIndex) { + return sourceTaskIndex; } // called at runtime to initialize the custom edge. @Override public void initialize(EdgeManagerContext context) { + this.context = context; byte[] payload = context.getUserPayload(); LOG.info("Initializing the edge, payload: " + payload); if (payload == null) { @@ -91,10 +91,10 @@ public void routeDataMovementEventToDestination(DataMovementEvent event, } @Override - public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex, - int numDestinationTasks, Map> mapDestTaskIndices) { + public void routeInputSourceTaskFailedEventToDestination(int + sourceTaskIndex, Map> mapDestTaskIndices) { List destTaskIndices = new ArrayList(); - addAllDestinationTaskIndices(numDestinationTasks, destTaskIndices); + addAllDestinationTaskIndices(context.getDestinationVertexNumTasks(), destTaskIndices); mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index cecedfb..06cbd6b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -64,7 +64,7 @@ * Only works with old mapred API * Will only work with a single MRInput for now. */ -public class CustomPartitionVertex implements VertexManagerPlugin { +public class CustomPartitionVertex extends VertexManagerPlugin { private static final Log LOG = LogFactory.getLog(CustomPartitionVertex.class.getName()); @@ -90,9 +90,9 @@ public void initialize(VertexManagerPluginContext context) { @Override public void onVertexStarted(Map> completions) { int numTasks = context.getVertexNumTasks(context.getVertexName()); - List scheduledTasks = new ArrayList(numTasks); + List scheduledTasks = new ArrayList(numTasks); for (int i = 0; i < numTasks; ++i) { - scheduledTasks.add(new Integer(i)); + scheduledTasks.add(new VertexManagerPluginContext.TaskWithLocationHint(new Integer(i), null)); } context.scheduleVertexTasks(scheduledTasks); } @@ -195,7 +195,7 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr getBucketSplitMapForPath(pathFileSplitsMap); try { - int totalResource = context.getTotalAVailableResource().getMemory(); + int totalResource = context.getTotalAvailableResource().getMemory(); int taskResource = context.getVertexTaskResource().getMemory(); float waves = conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES, @@ -267,7 +267,7 @@ private void processAllEvents(String inputName, context.setVertexParallelism( taskCount, new VertexLocationHint(grouper.createTaskLocationHints(finalSplits - .toArray(new InputSplit[finalSplits.size()]))), emMap); + .toArray(new InputSplit[finalSplits.size()]))), emMap, null); // Set the actual events for the tasks. context.addRootInputEvents(inputName, taskEvents); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 2e9728a..2c58987 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -79,7 +79,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.tez.client.PreWarmContext; -import org.apache.tez.client.TezSessionConfiguration; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.EdgeManagerDescriptor; @@ -91,10 +90,10 @@ import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; -import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; @@ -596,15 +595,13 @@ private LocalResource createLocalResource(FileSystem remoteFs, Path file, } /** - * @param sessionConfig session configuration * @param numContainers number of containers to pre-warm * @param localResources additional resources to pre-warm with * @return prewarm context object */ - public PreWarmContext createPreWarmContext(TezSessionConfiguration sessionConfig, int numContainers, - Map localResources) throws IOException, TezException { - - Configuration conf = sessionConfig.getTezConfiguration(); + public PreWarmContext createPreWarmContext(TezConfiguration conf, + int numContainers, Map localResources) throws + IOException, TezException { ProcessorDescriptor prewarmProcDescriptor = new ProcessorDescriptor(HivePreWarmProcessor.class.getName()); prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf)); @@ -614,7 +611,6 @@ public PreWarmContext createPreWarmContext(TezSessionConfiguration sessionConfig Map combinedResources = new HashMap(); - combinedResources.putAll(sessionConfig.getSessionResources()); if (localResources != null) { combinedResources.putAll(localResources); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index dc99702..94b2a20 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -44,6 +44,7 @@ import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto; import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.RootInputSpecUpdate; import org.apache.tez.runtime.api.TezRootInputInitializer; import org.apache.tez.runtime.api.TezRootInputInitializerContext; import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent; @@ -174,9 +175,9 @@ private InputSplitInfoMem generateGroupedSplits(TezRootInputInitializerContext c List events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1); - RootInputConfigureVertexTasksEvent configureVertexEvent = - new RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(), - inputSplitInfo.getTaskLocationHints()); + RootInputConfigureVertexTasksEvent configureVertexEvent = new + RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(), + inputSplitInfo.getTaskLocationHints(), RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate()); events.add(configureVertexEvent); if (sendSerializedEvents) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 2959fcc..914bfaa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -50,10 +50,8 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.tez.client.AMConfiguration; import org.apache.tez.client.PreWarmContext; -import org.apache.tez.client.TezSession; -import org.apache.tez.client.TezSessionConfiguration; +import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; @@ -70,7 +68,7 @@ private HiveConf conf; private Path tezScratchDir; private LocalResource appJarLr; - private TezSession session; + private TezClient session; private String sessionId; private DagUtils utils; private String queueName; @@ -153,11 +151,6 @@ public void open(HiveConf conf, String[] additionalFiles) refreshLocalResourcesFromConf(conf); - // generate basic tez config - TezConfiguration tezConfig = new TezConfiguration(conf); - - tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString()); - // unless already installed on all the cluster nodes, we'll have to // localize hive-exec.jar as well. appJarLr = createJarLocalResource(utils.getExecJarPathLocal()); @@ -173,13 +166,12 @@ public void open(HiveConf conf, String[] additionalFiles) Map amEnv = new HashMap(); MRHelpers.updateEnvironmentForMRAM(conf, amEnv); - AMConfiguration amConfig = new AMConfiguration(amEnv, commonLocalResources, tezConfig, null); - - // configuration for the session - TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConfig); - // and finally we're ready to create and start the session - session = new TezSession("HIVE-" + sessionId, sessionConfig); + // generate basic tez config + TezConfiguration tezConfig = new TezConfiguration(conf); + tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString()); + session = new TezClient("HIVE-" + sessionId, tezConfig, true, + commonLocalResources, null); LOG.info("Opening new Tez Session (id: " + sessionId + ", scratch dir: " + tezScratchDir + ")"); @@ -190,7 +182,8 @@ public void open(HiveConf conf, String[] additionalFiles) int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS); LOG.info("Prewarming " + n + " containers (id: " + sessionId + ", scratch dir: " + tezScratchDir + ")"); - PreWarmContext context = utils.createPreWarmContext(sessionConfig, n, commonLocalResources); + PreWarmContext context = utils.createPreWarmContext(tezConfig, n, + commonLocalResources); try { session.preWarm(context); } catch (InterruptedException ie) { @@ -200,10 +193,12 @@ public void open(HiveConf conf, String[] additionalFiles) } } + session.waitTillReady(); // In case we need to run some MR jobs, we'll run them under tez MR emulation. The session // id is used for tez to reuse the current session rather than start a new one. conf.set("mapreduce.framework.name", "yarn-tez"); - conf.set("mapreduce.tez.session.tokill-application-id", session.getApplicationId().toString()); + conf.set("mapreduce.tez.session.tokill-application-id", + session.getAppMasterApplicationId().toString()); openSessions.add(this); } @@ -280,7 +275,7 @@ public String getSessionId() { return sessionId; } - public TezSession getSession() { + public TezClient getSession() { return session; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 949bcfb..4e08750 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -301,7 +301,8 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir, try { // ready to start execution on the cluster - dagClient = sessionState.getSession().submitDAG(dag, resourceMap); + sessionState.getSession().addAppMasterLocalResources(resourceMap); + dagClient = sessionState.getSession().submitDAG(dag); } catch (SessionNotRunning nr) { console.printInfo("Tez session was closed. Reopening..."); @@ -309,7 +310,7 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir, TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf); console.printInfo("Session re-established."); - dagClient = sessionState.getSession().submitDAG(dag, resourceMap); + dagClient = sessionState.getSession().submitDAG(dag); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 43125f7..57532af 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -53,7 +53,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.tez.client.TezSession; +import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.EdgeProperty; @@ -74,7 +74,7 @@ ReduceWork[] rws; TezWork work; TezTask task; - TezSession session; + TezClient session; TezSessionState sessionState; JobConf conf; LocalResource appLr; @@ -156,10 +156,10 @@ public Edge answer(InvocationOnMock invocation) throws Throwable { conf = new JobConf(); appLr = mock(LocalResource.class); - session = mock(TezSession.class); + session = mock(TezClient.class); sessionState = mock(TezSessionState.class); when(sessionState.getSession()).thenReturn(session); - when(session.submitDAG(any(DAG.class), any(Map.class))) + when(session.submitDAG(any(DAG.class))) .thenThrow(new SessionNotRunning("")) .thenReturn(mock(DAGClient.class)); } @@ -206,7 +206,7 @@ public void testSubmit() throws Exception { // validate close/reopen verify(sessionState, times(1)).open(any(HiveConf.class)); verify(sessionState, times(1)).close(eq(false)); // now uses pool after HIVE-7043 - verify(session, times(2)).submitDAG(any(DAG.class), any(Map.class)); + verify(session, times(2)).submitDAG(any(DAG.class)); } @Test