diff --git pom.xml pom.xml
index b5a5697..457a178 100644
--- pom.xml
+++ pom.xml
@@ -144,7 +144,7 @@
1.0.1
1.7.5
4.0.4
- 0.4.0-incubating
+ 0.5.0-incubating-SNAPSHOT
1.1
0.2
1.4
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
index 74c5429..397cd81 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
@@ -33,36 +33,36 @@
import com.google.common.collect.Multimap;
-public class CustomPartitionEdge implements EdgeManager {
+public class CustomPartitionEdge extends EdgeManager {
private static final Log LOG = LogFactory.getLog(CustomPartitionEdge.class.getName());
CustomEdgeConfiguration conf = null;
+ EdgeManagerContext context = null;
// used by the framework at runtime. initialize is the real initializer at runtime
public CustomPartitionEdge() {
}
@Override
- public int getNumDestinationTaskPhysicalInputs(int numSourceTasks,
- int destinationTaskIndex) {
+ public int getNumDestinationTaskPhysicalInputs(int numSourceTasks) {
return numSourceTasks;
}
@Override
- public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks,
- int sourceTaskIndex) {
+ public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks) {
return conf.getNumBuckets();
}
@Override
- public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestinationTasks) {
- return numDestinationTasks;
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+ return sourceTaskIndex;
}
// called at runtime to initialize the custom edge.
@Override
public void initialize(EdgeManagerContext context) {
+ this.context = context;
byte[] payload = context.getUserPayload();
LOG.info("Initializing the edge, payload: " + payload);
if (payload == null) {
@@ -91,16 +91,16 @@ public void routeDataMovementEventToDestination(DataMovementEvent event,
}
@Override
- public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
- int numDestinationTasks, Map> mapDestTaskIndices) {
+ public void routeInputSourceTaskFailedEventToDestination(int
+ sourceTaskIndex, Map> mapDestTaskIndices) {
List destTaskIndices = new ArrayList();
- addAllDestinationTaskIndices(numDestinationTasks, destTaskIndices);
+ addAllDestinationTaskIndices(context.getDestinationVertexNumTasks(), destTaskIndices);
mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices);
}
@Override
public int routeInputErrorEventToSource(InputReadErrorEvent event,
- int destinationTaskIndex) {
+ int destinationTaskIndex, int destinationFailedInputIndex) {
return event.getIndex();
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
index cecedfb..06cbd6b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
@@ -64,7 +64,7 @@
* Only works with old mapred API
* Will only work with a single MRInput for now.
*/
-public class CustomPartitionVertex implements VertexManagerPlugin {
+public class CustomPartitionVertex extends VertexManagerPlugin {
private static final Log LOG = LogFactory.getLog(CustomPartitionVertex.class.getName());
@@ -90,9 +90,9 @@ public void initialize(VertexManagerPluginContext context) {
@Override
public void onVertexStarted(Map> completions) {
int numTasks = context.getVertexNumTasks(context.getVertexName());
- List scheduledTasks = new ArrayList(numTasks);
+ List scheduledTasks = new ArrayList(numTasks);
for (int i = 0; i < numTasks; ++i) {
- scheduledTasks.add(new Integer(i));
+ scheduledTasks.add(new VertexManagerPluginContext.TaskWithLocationHint(new Integer(i), null));
}
context.scheduleVertexTasks(scheduledTasks);
}
@@ -195,7 +195,7 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr
getBucketSplitMapForPath(pathFileSplitsMap);
try {
- int totalResource = context.getTotalAVailableResource().getMemory();
+ int totalResource = context.getTotalAvailableResource().getMemory();
int taskResource = context.getVertexTaskResource().getMemory();
float waves =
conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
@@ -267,7 +267,7 @@ private void processAllEvents(String inputName,
context.setVertexParallelism(
taskCount,
new VertexLocationHint(grouper.createTaskLocationHints(finalSplits
- .toArray(new InputSplit[finalSplits.size()]))), emMap);
+ .toArray(new InputSplit[finalSplits.size()]))), emMap, null);
// Set the actual events for the tasks.
context.addRootInputEvents(inputName, taskEvents);
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index e116426..a9fdc57 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -80,7 +80,6 @@
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.client.TezSessionConfiguration;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
@@ -92,10 +91,10 @@
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
-import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
@@ -223,7 +222,7 @@ public GroupInputEdge createEdge(VertexGroup group, JobConf wConf,
Class mergeInputClass;
- LOG.info("Creating Edge between " + group.getGroupName() + " and " + w.getVertexName());
+ LOG.info("Creating Edge between " + group.getGroupName() + " and " + w.getName());
w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
EdgeType edgeType = edgeProp.getEdgeType();
@@ -487,7 +486,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork,
Map environment = new HashMap();
MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
map.setTaskEnvironment(environment);
- map.setJavaOpts(getContainerJavaOpts(conf));
+ map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
assert mapWork.getAliasToWork().keySet().size() == 1;
@@ -517,7 +516,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork,
localResources);
}
- map.setTaskLocalResources(localResources);
+ map.setTaskLocalFiles(localResources);
return map;
}
@@ -565,14 +564,14 @@ private Vertex createVertex(JobConf conf, ReduceWork reduceWork,
MRHelpers.updateEnvironmentForMRTasks(conf, environment, false);
reducer.setTaskEnvironment(environment);
- reducer.setJavaOpts(getContainerJavaOpts(conf));
+ reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
Map localResources = new HashMap();
localResources.put(getBaseName(appJarLr), appJarLr);
for (LocalResource lr: additionalLr) {
localResources.put(getBaseName(lr), lr);
}
- reducer.setTaskLocalResources(localResources);
+ reducer.setTaskLocalFiles(localResources);
return reducer;
}
@@ -606,15 +605,13 @@ private LocalResource createLocalResource(FileSystem remoteFs, Path file,
}
/**
- * @param sessionConfig session configuration
* @param numContainers number of containers to pre-warm
* @param localResources additional resources to pre-warm with
* @return prewarm context object
*/
- public PreWarmContext createPreWarmContext(TezSessionConfiguration sessionConfig, int numContainers,
- Map localResources) throws IOException, TezException {
-
- Configuration conf = sessionConfig.getTezConfiguration();
+ public PreWarmContext createPreWarmContext(TezConfiguration conf,
+ int numContainers, Map localResources) throws
+ IOException, TezException {
ProcessorDescriptor prewarmProcDescriptor = new ProcessorDescriptor(HivePreWarmProcessor.class.getName());
prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf));
@@ -624,7 +621,6 @@ public PreWarmContext createPreWarmContext(TezSessionConfiguration sessionConfig
Map combinedResources = new HashMap();
- combinedResources.putAll(sessionConfig.getSessionResources());
if (localResources != null) {
combinedResources.putAll(localResources);
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index dc99702..91b4182 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -44,10 +44,12 @@
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.RootInputSpecUpdate;
import org.apache.tez.runtime.api.TezRootInputInitializer;
import org.apache.tez.runtime.api.TezRootInputInitializerContext;
import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.events.RootInputInitializerEvent;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
@@ -174,9 +176,9 @@ private InputSplitInfoMem generateGroupedSplits(TezRootInputInitializerContext c
List events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1);
- RootInputConfigureVertexTasksEvent configureVertexEvent =
- new RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(),
- inputSplitInfo.getTaskLocationHints());
+ RootInputConfigureVertexTasksEvent configureVertexEvent = new
+ RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(),
+ inputSplitInfo.getTaskLocationHints(), RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
events.add(configureVertexEvent);
if (sendSerializedEvents) {
@@ -196,4 +198,7 @@ private InputSplitInfoMem generateGroupedSplits(TezRootInputInitializerContext c
}
return events;
}
+
+ public void handleInputInitializerEvent(List events) throws Exception {
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 2959fcc..914bfaa 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -50,10 +50,8 @@
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.AMConfiguration;
import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.client.TezSession;
-import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
@@ -70,7 +68,7 @@
private HiveConf conf;
private Path tezScratchDir;
private LocalResource appJarLr;
- private TezSession session;
+ private TezClient session;
private String sessionId;
private DagUtils utils;
private String queueName;
@@ -153,11 +151,6 @@ public void open(HiveConf conf, String[] additionalFiles)
refreshLocalResourcesFromConf(conf);
- // generate basic tez config
- TezConfiguration tezConfig = new TezConfiguration(conf);
-
- tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString());
-
// unless already installed on all the cluster nodes, we'll have to
// localize hive-exec.jar as well.
appJarLr = createJarLocalResource(utils.getExecJarPathLocal());
@@ -173,13 +166,12 @@ public void open(HiveConf conf, String[] additionalFiles)
Map amEnv = new HashMap();
MRHelpers.updateEnvironmentForMRAM(conf, amEnv);
- AMConfiguration amConfig = new AMConfiguration(amEnv, commonLocalResources, tezConfig, null);
-
- // configuration for the session
- TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConfig);
-
// and finally we're ready to create and start the session
- session = new TezSession("HIVE-" + sessionId, sessionConfig);
+ // generate basic tez config
+ TezConfiguration tezConfig = new TezConfiguration(conf);
+ tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString());
+ session = new TezClient("HIVE-" + sessionId, tezConfig, true,
+ commonLocalResources, null);
LOG.info("Opening new Tez Session (id: " + sessionId
+ ", scratch dir: " + tezScratchDir + ")");
@@ -190,7 +182,8 @@ public void open(HiveConf conf, String[] additionalFiles)
int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
LOG.info("Prewarming " + n + " containers (id: " + sessionId
+ ", scratch dir: " + tezScratchDir + ")");
- PreWarmContext context = utils.createPreWarmContext(sessionConfig, n, commonLocalResources);
+ PreWarmContext context = utils.createPreWarmContext(tezConfig, n,
+ commonLocalResources);
try {
session.preWarm(context);
} catch (InterruptedException ie) {
@@ -200,10 +193,12 @@ public void open(HiveConf conf, String[] additionalFiles)
}
}
+ session.waitTillReady();
// In case we need to run some MR jobs, we'll run them under tez MR emulation. The session
// id is used for tez to reuse the current session rather than start a new one.
conf.set("mapreduce.framework.name", "yarn-tez");
- conf.set("mapreduce.tez.session.tokill-application-id", session.getApplicationId().toString());
+ conf.set("mapreduce.tez.session.tokill-application-id",
+ session.getAppMasterApplicationId().toString());
openSessions.add(this);
}
@@ -280,7 +275,7 @@ public String getSessionId() {
return sessionId;
}
- public TezSession getSession() {
+ public TezClient getSession() {
return session;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 951e918..93793d6 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -305,7 +305,8 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
try {
// ready to start execution on the cluster
- dagClient = sessionState.getSession().submitDAG(dag, resourceMap);
+ sessionState.getSession().addAppMasterLocalResources(resourceMap);
+ dagClient = sessionState.getSession().submitDAG(dag);
} catch (SessionNotRunning nr) {
console.printInfo("Tez session was closed. Reopening...");
@@ -313,7 +314,7 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf);
console.printInfo("Session re-established.");
- dagClient = sessionState.getSession().submitDAG(dag, resourceMap);
+ dagClient = sessionState.getSession().submitDAG(dag);
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 059424d..020b4cf 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -1938,7 +1938,7 @@ private void flushStripe() throws IOException {
if (availRatio < paddingTolerance && addBlockPadding) {
long padding = blockSize - (start % blockSize);
byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)];
- LOG.info(String.format("Padding ORC by %d bytes (<= %0.2f * %d)",
+ LOG.info(String.format("Padding ORC by %d bytes (<= %.2f * %d)",
padding, availRatio, defaultStripeSize));
start += padding;
while (padding > 0) {
diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index 0835bde..f4bd834 100644
--- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -54,7 +54,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
@@ -75,7 +75,7 @@
ReduceWork[] rws;
TezWork work;
TezTask task;
- TezSession session;
+ TezClient session;
TezSessionState sessionState;
JobConf conf;
LocalResource appLr;
@@ -158,10 +158,10 @@ public Edge answer(InvocationOnMock invocation) throws Throwable {
appLr = mock(LocalResource.class);
SessionState.start(new HiveConf());
- session = mock(TezSession.class);
+ session = mock(TezClient.class);
sessionState = mock(TezSessionState.class);
when(sessionState.getSession()).thenReturn(session);
- when(session.submitDAG(any(DAG.class), any(Map.class)))
+ when(session.submitDAG(any(DAG.class)))
.thenThrow(new SessionNotRunning(""))
.thenReturn(mock(DAGClient.class));
}
@@ -186,7 +186,7 @@ public void testBuildDag() throws IllegalArgumentException, IOException, Excepti
for (BaseWork x: work.getChildren(w)) {
boolean found = false;
for (Vertex u: outs) {
- if (u.getVertexName().equals(x.getName())) {
+ if (u.getName().equals(x.getName())) {
found = true;
break;
}
@@ -209,7 +209,7 @@ public void testSubmit() throws Exception {
// validate close/reopen
verify(sessionState, times(1)).open(any(HiveConf.class));
verify(sessionState, times(1)).close(eq(false)); // now uses pool after HIVE-7043
- verify(session, times(2)).submitDAG(any(DAG.class), any(Map.class));
+ verify(session, times(2)).submitDAG(any(DAG.class));
}
@Test