diff --git pom.xml pom.xml
index b5a5697..457a178 100644
--- pom.xml
+++ pom.xml
@@ -144,7 +144,7 @@
1.0.1
1.7.5
4.0.4
- 0.4.0-incubating
+ 0.5.0-incubating-SNAPSHOT
1.1
0.2
1.4
diff --git ql/pom.xml ql/pom.xml
index 3ed8e57..c642d8c 100644
--- ql/pom.xml
+++ ql/pom.xml
@@ -327,6 +327,21 @@
+
+ org.apache.tez
+ tez-api
+ 0.5.0-incubating-SNAPSHOT
+
+
+ org.apache.tez
+ tez-api
+ 0.5.0-incubating-SNAPSHOT
+
+
+ org.apache.tez
+ tez-api
+ 0.5.0-incubating-SNAPSHOT
+
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
index 74c5429..2e036b9 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
@@ -33,36 +33,36 @@
import com.google.common.collect.Multimap;
-public class CustomPartitionEdge implements EdgeManager {
+public class CustomPartitionEdge extends EdgeManager {
private static final Log LOG = LogFactory.getLog(CustomPartitionEdge.class.getName());
CustomEdgeConfiguration conf = null;
+ EdgeManagerContext context = null;
// used by the framework at runtime. initialize is the real initializer at runtime
public CustomPartitionEdge() {
}
@Override
- public int getNumDestinationTaskPhysicalInputs(int numSourceTasks,
- int destinationTaskIndex) {
+ public int getNumDestinationTaskPhysicalInputs(int numSourceTasks) {
return numSourceTasks;
}
@Override
- public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks,
- int sourceTaskIndex) {
+ public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks) {
return conf.getNumBuckets();
}
@Override
- public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestinationTasks) {
- return numDestinationTasks;
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+ return sourceTaskIndex;
}
// called at runtime to initialize the custom edge.
@Override
public void initialize(EdgeManagerContext context) {
+ this.context = context;
byte[] payload = context.getUserPayload();
LOG.info("Initializing the edge, payload: " + payload);
if (payload == null) {
@@ -91,10 +91,10 @@ public void routeDataMovementEventToDestination(DataMovementEvent event,
}
@Override
- public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
- int numDestinationTasks, Map> mapDestTaskIndices) {
+ public void routeInputSourceTaskFailedEventToDestination(int
+ sourceTaskIndex, Map> mapDestTaskIndices) {
List destTaskIndices = new ArrayList();
- addAllDestinationTaskIndices(numDestinationTasks, destTaskIndices);
+ addAllDestinationTaskIndices(context.getDestinationVertexNumTasks(), destTaskIndices);
mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices);
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
index cecedfb..06cbd6b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
@@ -64,7 +64,7 @@
* Only works with old mapred API
* Will only work with a single MRInput for now.
*/
-public class CustomPartitionVertex implements VertexManagerPlugin {
+public class CustomPartitionVertex extends VertexManagerPlugin {
private static final Log LOG = LogFactory.getLog(CustomPartitionVertex.class.getName());
@@ -267,7 +267,7 @@ private void processAllEvents(String inputName,
context.setVertexParallelism(
taskCount,
new VertexLocationHint(grouper.createTaskLocationHints(finalSplits
- .toArray(new InputSplit[finalSplits.size()]))), emMap);
+ .toArray(new InputSplit[finalSplits.size()]))), emMap, null);
// Set the actual events for the tasks.
context.addRootInputEvents(inputName, taskEvents);
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 2e9728a..2c58987 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -79,7 +79,6 @@
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.client.TezSessionConfiguration;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
@@ -91,10 +90,10 @@
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
-import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
@@ -596,15 +595,13 @@ private LocalResource createLocalResource(FileSystem remoteFs, Path file,
}
/**
- * @param sessionConfig session configuration
* @param numContainers number of containers to pre-warm
* @param localResources additional resources to pre-warm with
* @return prewarm context object
*/
- public PreWarmContext createPreWarmContext(TezSessionConfiguration sessionConfig, int numContainers,
- Map localResources) throws IOException, TezException {
-
- Configuration conf = sessionConfig.getTezConfiguration();
+ public PreWarmContext createPreWarmContext(TezConfiguration conf,
+ int numContainers, Map localResources) throws
+ IOException, TezException {
ProcessorDescriptor prewarmProcDescriptor = new ProcessorDescriptor(HivePreWarmProcessor.class.getName());
prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf));
@@ -614,7 +611,6 @@ public PreWarmContext createPreWarmContext(TezSessionConfiguration sessionConfig
Map combinedResources = new HashMap();
- combinedResources.putAll(sessionConfig.getSessionResources());
if (localResources != null) {
combinedResources.putAll(localResources);
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index dc99702..94b2a20 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -44,6 +44,7 @@
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.RootInputSpecUpdate;
import org.apache.tez.runtime.api.TezRootInputInitializer;
import org.apache.tez.runtime.api.TezRootInputInitializerContext;
import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
@@ -174,9 +175,9 @@ private InputSplitInfoMem generateGroupedSplits(TezRootInputInitializerContext c
List events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1);
- RootInputConfigureVertexTasksEvent configureVertexEvent =
- new RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(),
- inputSplitInfo.getTaskLocationHints());
+ RootInputConfigureVertexTasksEvent configureVertexEvent = new
+ RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(),
+ inputSplitInfo.getTaskLocationHints(), RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
events.add(configureVertexEvent);
if (sendSerializedEvents) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 2959fcc..94d057f 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -50,10 +50,8 @@
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.AMConfiguration;
import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.client.TezSession;
-import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
@@ -70,7 +68,7 @@
private HiveConf conf;
private Path tezScratchDir;
private LocalResource appJarLr;
- private TezSession session;
+ private TezClient session;
private String sessionId;
private DagUtils utils;
private String queueName;
@@ -173,13 +171,9 @@ public void open(HiveConf conf, String[] additionalFiles)
Map amEnv = new HashMap();
MRHelpers.updateEnvironmentForMRAM(conf, amEnv);
- AMConfiguration amConfig = new AMConfiguration(amEnv, commonLocalResources, tezConfig, null);
-
- // configuration for the session
- TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConfig);
-
// and finally we're ready to create and start the session
- session = new TezSession("HIVE-" + sessionId, sessionConfig);
+ session = new TezClient("HIVE-" + sessionId, tezConfig, true,
+ commonLocalResources, null);
LOG.info("Opening new Tez Session (id: " + sessionId
+ ", scratch dir: " + tezScratchDir + ")");
@@ -190,7 +184,8 @@ public void open(HiveConf conf, String[] additionalFiles)
int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
LOG.info("Prewarming " + n + " containers (id: " + sessionId
+ ", scratch dir: " + tezScratchDir + ")");
- PreWarmContext context = utils.createPreWarmContext(sessionConfig, n, commonLocalResources);
+ PreWarmContext context = utils.createPreWarmContext(tezConfig, n,
+ commonLocalResources);
try {
session.preWarm(context);
} catch (InterruptedException ie) {
@@ -203,7 +198,10 @@ public void open(HiveConf conf, String[] additionalFiles)
// In case we need to run some MR jobs, we'll run them under tez MR emulation. The session
// id is used for tez to reuse the current session rather than start a new one.
conf.set("mapreduce.framework.name", "yarn-tez");
- conf.set("mapreduce.tez.session.tokill-application-id", session.getApplicationId().toString());
+
+ //https://issues.apache.org/jira/browse/TEZ-692 appId wont be available before submitting DAG.
+ //conf.set("mapreduce.tez.session.tokill-application-id",
+ //session.getApplicationId().toString());
openSessions.add(this);
}
@@ -280,7 +278,7 @@ public String getSessionId() {
return sessionId;
}
- public TezSession getSession() {
+ public TezClient getSession() {
return session;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 949bcfb..4e08750 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -301,7 +301,8 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
try {
// ready to start execution on the cluster
- dagClient = sessionState.getSession().submitDAG(dag, resourceMap);
+ sessionState.getSession().addAppMasterLocalResources(resourceMap);
+ dagClient = sessionState.getSession().submitDAG(dag);
} catch (SessionNotRunning nr) {
console.printInfo("Tez session was closed. Reopening...");
@@ -309,7 +310,7 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf);
console.printInfo("Session re-established.");
- dagClient = sessionState.getSession().submitDAG(dag, resourceMap);
+ dagClient = sessionState.getSession().submitDAG(dag);
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index 43125f7..57532af 100644
--- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -53,7 +53,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
@@ -74,7 +74,7 @@
ReduceWork[] rws;
TezWork work;
TezTask task;
- TezSession session;
+ TezClient session;
TezSessionState sessionState;
JobConf conf;
LocalResource appLr;
@@ -156,10 +156,10 @@ public Edge answer(InvocationOnMock invocation) throws Throwable {
conf = new JobConf();
appLr = mock(LocalResource.class);
- session = mock(TezSession.class);
+ session = mock(TezClient.class);
sessionState = mock(TezSessionState.class);
when(sessionState.getSession()).thenReturn(session);
- when(session.submitDAG(any(DAG.class), any(Map.class)))
+ when(session.submitDAG(any(DAG.class)))
.thenThrow(new SessionNotRunning(""))
.thenReturn(mock(DAGClient.class));
}
@@ -206,7 +206,7 @@ public void testSubmit() throws Exception {
// validate close/reopen
verify(sessionState, times(1)).open(any(HiveConf.class));
verify(sessionState, times(1)).close(eq(false)); // now uses pool after HIVE-7043
- verify(session, times(2)).submitDAG(any(DAG.class), any(Map.class));
+ verify(session, times(2)).submitDAG(any(DAG.class));
}
@Test