diff --git pom.xml pom.xml
index b3216e1..157cb0d 100644
--- pom.xml
+++ pom.xml
@@ -146,7 +146,7 @@
1.0.1
1.7.5
4.0.4
- 0.4.0-incubating
+ 0.5.0-SNAPSHOT
1.1
0.2
1.4
diff --git ql/pom.xml ql/pom.xml
index 0729d47..c3e0adb 100644
--- ql/pom.xml
+++ ql/pom.xml
@@ -297,6 +297,38 @@
org.apache.tez
+ tez-runtime-internals
+ ${tez.version}
+ true
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-jobclient
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+
+
+
+
+ org.apache.tez
tez-mapreduce
${tez.version}
true
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
index 74c5429..397cd81 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
@@ -33,36 +33,36 @@
import com.google.common.collect.Multimap;
-public class CustomPartitionEdge implements EdgeManager {
+public class CustomPartitionEdge extends EdgeManager {
private static final Log LOG = LogFactory.getLog(CustomPartitionEdge.class.getName());
CustomEdgeConfiguration conf = null;
+ EdgeManagerContext context = null;
// used by the framework at runtime. initialize is the real initializer at runtime
public CustomPartitionEdge() {
}
@Override
- public int getNumDestinationTaskPhysicalInputs(int numSourceTasks,
- int destinationTaskIndex) {
+ public int getNumDestinationTaskPhysicalInputs(int numSourceTasks) {
return numSourceTasks;
}
@Override
- public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks,
- int sourceTaskIndex) {
+ public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks) {
return conf.getNumBuckets();
}
@Override
- public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestinationTasks) {
- return numDestinationTasks;
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+ return sourceTaskIndex;
}
// called at runtime to initialize the custom edge.
@Override
public void initialize(EdgeManagerContext context) {
+ this.context = context;
byte[] payload = context.getUserPayload();
LOG.info("Initializing the edge, payload: " + payload);
if (payload == null) {
@@ -91,16 +91,16 @@ public void routeDataMovementEventToDestination(DataMovementEvent event,
}
@Override
- public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
- int numDestinationTasks, Map> mapDestTaskIndices) {
+ public void routeInputSourceTaskFailedEventToDestination(int
+ sourceTaskIndex, Map> mapDestTaskIndices) {
List destTaskIndices = new ArrayList();
- addAllDestinationTaskIndices(numDestinationTasks, destTaskIndices);
+ addAllDestinationTaskIndices(context.getDestinationVertexNumTasks(), destTaskIndices);
mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices);
}
@Override
public int routeInputErrorEventToSource(InputReadErrorEvent event,
- int destinationTaskIndex) {
+ int destinationTaskIndex, int destinationFailedInputIndex) {
return event.getIndex();
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
index cecedfb..06cbd6b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
@@ -64,7 +64,7 @@
* Only works with old mapred API
* Will only work with a single MRInput for now.
*/
-public class CustomPartitionVertex implements VertexManagerPlugin {
+public class CustomPartitionVertex extends VertexManagerPlugin {
private static final Log LOG = LogFactory.getLog(CustomPartitionVertex.class.getName());
@@ -90,9 +90,9 @@ public void initialize(VertexManagerPluginContext context) {
@Override
public void onVertexStarted(Map> completions) {
int numTasks = context.getVertexNumTasks(context.getVertexName());
- List scheduledTasks = new ArrayList(numTasks);
+ List scheduledTasks = new ArrayList(numTasks);
for (int i = 0; i < numTasks; ++i) {
- scheduledTasks.add(new Integer(i));
+ scheduledTasks.add(new VertexManagerPluginContext.TaskWithLocationHint(new Integer(i), null));
}
context.scheduleVertexTasks(scheduledTasks);
}
@@ -195,7 +195,7 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr
getBucketSplitMapForPath(pathFileSplitsMap);
try {
- int totalResource = context.getTotalAVailableResource().getMemory();
+ int totalResource = context.getTotalAvailableResource().getMemory();
int taskResource = context.getVertexTaskResource().getMemory();
float waves =
conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
@@ -267,7 +267,7 @@ private void processAllEvents(String inputName,
context.setVertexParallelism(
taskCount,
new VertexLocationHint(grouper.createTaskLocationHints(finalSplits
- .toArray(new InputSplit[finalSplits.size()]))), emMap);
+ .toArray(new InputSplit[finalSplits.size()]))), emMap, null);
// Set the actual events for the tasks.
context.addRootInputEvents(inputName, taskEvents);
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index e116426..6219ae4 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -80,37 +80,32 @@
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.client.TezSessionConfiguration;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
-import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfigurer;
+import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
-import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
-import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
-import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
-import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
@@ -162,6 +157,7 @@ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) {
JobConf conf = new JobConf(baseConf);
if (mapWork.getNumMapTasks() != null) {
+ // Is this required ?
conf.setInt(MRJobConfig.NUM_MAPS, mapWork.getNumMapTasks().intValue());
}
@@ -200,6 +196,7 @@ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) {
inpFormat = CombineHiveInputFormat.class.getName();
}
+ // Is this required ?
conf.set("mapred.mapper.class", ExecMapper.class.getName());
conf.set("mapred.input.format.class", inpFormat);
@@ -211,20 +208,19 @@ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) {
* Edge between them.
*
* @param group The parent VertexGroup
- * @param wConf The job conf of the child vertex
+ * @param vConf The job conf of one of the parrent (grouped) vertices
* @param w The child vertex
* @param edgeProp the edge property of connection between the two
* endpoints.
*/
@SuppressWarnings("rawtypes")
- public GroupInputEdge createEdge(VertexGroup group, JobConf wConf,
+ public GroupInputEdge createEdge(VertexGroup group, JobConf vConf,
Vertex w, TezEdgeProperty edgeProp)
throws IOException {
Class mergeInputClass;
- LOG.info("Creating Edge between " + group.getGroupName() + " and " + w.getVertexName());
- w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
+ LOG.info("Creating Edge between " + group.getGroupName() + " and " + w.getName());
EdgeType edgeType = edgeProp.getEdgeType();
switch (edgeType) {
@@ -255,40 +251,23 @@ public GroupInputEdge createEdge(VertexGroup group, JobConf wConf,
break;
}
- return new GroupInputEdge(group, w, createEdgeProperty(edgeProp),
+ return new GroupInputEdge(group, w, createEdgeProperty(edgeProp, vConf),
new InputDescriptor(mergeInputClass.getName()));
}
/**
- * Given two vertices a, b update their configurations to be used in an Edge a-b
- */
- public void updateConfigurationForEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w)
- throws IOException {
-
- // Tez needs to setup output subsequent input pairs correctly
- MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf);
-
- // update payloads (configuration for the vertices might have changed)
- v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf));
- w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
- }
-
- /**
- * Given two vertices and their respective configuration objects createEdge
+ * Given two vertices and the configuration for the source vertex, createEdge
* will create an Edge object that connects the two.
*
- * @param vConf JobConf of the first vertex
+ * @param vConf JobConf of the first (source) vertex
* @param v The first vertex (source)
- * @param wConf JobConf of the second vertex
* @param w The second vertex (sink)
* @return
*/
- public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w,
+ public Edge createEdge(JobConf vConf, Vertex v, Vertex w,
TezEdgeProperty edgeProp)
throws IOException {
- updateConfigurationForEdge(vConf, v, wConf, w);
-
switch(edgeProp.getEdgeType()) {
case CUSTOM_EDGE: {
int numBuckets = edgeProp.getNumBuckets();
@@ -307,31 +286,33 @@ public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w,
// nothing
}
- return new Edge(v, w, createEdgeProperty(edgeProp));
+ return new Edge(v, w, createEdgeProperty(edgeProp, vConf));
}
/*
* Helper function to create an edge property from an edge type.
*/
@SuppressWarnings("rawtypes")
- private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOException {
- DataMovementType dataMovementType;
- Class logicalInputClass;
- Class logicalOutputClass;
+ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf)
+ throws IOException {
+ MRHelpers.translateVertexConfToTez(conf);
+ String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
+ String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
+ String partitionerClassName = conf.get("mapred.partitioner.class");
+ Configuration partitionerConf;
- EdgeProperty edgeProperty = null;
EdgeType edgeType = edgeProp.getEdgeType();
switch (edgeType) {
case BROADCAST_EDGE:
- dataMovementType = DataMovementType.BROADCAST;
- logicalOutputClass = OnFileUnorderedKVOutput.class;
- logicalInputClass = ShuffledUnorderedKVInput.class;
- break;
-
+ UnorderedUnpartitionedKVEdgeConfigurer et1Conf = UnorderedUnpartitionedKVEdgeConfigurer
+ .newBuilder(keyClass, valClass).setFromConfiguration(conf).build();
+ return et1Conf.createDefaultBroadcastEdgeProperty();
case CUSTOM_EDGE:
- dataMovementType = DataMovementType.CUSTOM;
- logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
- logicalInputClass = ShuffledUnorderedKVInput.class;
+ assert partitionerClassName != null;
+ partitionerConf = createPartitionerConf(partitionerClassName, conf);
+ UnorderedPartitionedKVEdgeConfigurer et2Conf = UnorderedPartitionedKVEdgeConfigurer
+ .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf)
+ .setFromConfiguration(conf).build();
EdgeManagerDescriptor edgeDesc =
new EdgeManagerDescriptor(CustomPartitionEdge.class.getName());
CustomEdgeConfiguration edgeConf =
@@ -340,38 +321,43 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOExcep
edgeConf.write(dob);
byte[] userPayload = dob.getData();
edgeDesc.setUserPayload(userPayload);
- edgeProperty =
- new EdgeProperty(edgeDesc,
- DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- new OutputDescriptor(logicalOutputClass.getName()),
- new InputDescriptor(logicalInputClass.getName()));
- break;
-
+ return et2Conf.createDefaultCustomEdgeProperty(edgeDesc);
case CUSTOM_SIMPLE_EDGE:
- dataMovementType = DataMovementType.SCATTER_GATHER;
- logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
- logicalInputClass = ShuffledUnorderedKVInput.class;
- break;
-
+ assert partitionerClassName != null;
+ partitionerConf = createPartitionerConf(partitionerClassName, conf);
+ UnorderedPartitionedKVEdgeConfigurer et3Conf = UnorderedPartitionedKVEdgeConfigurer
+ .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf)
+ .setFromConfiguration(conf).build();
+ return et3Conf.createDefaultEdgeProperty();
case SIMPLE_EDGE:
default:
- dataMovementType = DataMovementType.SCATTER_GATHER;
- logicalOutputClass = OnFileSortedOutput.class;
- logicalInputClass = ShuffledMergedInputLegacy.class;
- break;
+ assert partitionerClassName != null;
+ partitionerConf = createPartitionerConf(partitionerClassName, conf);
+ OrderedPartitionedKVEdgeConfigurer et4Conf = OrderedPartitionedKVEdgeConfigurer
+ .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf)
+ .setFromConfiguration(conf).build();
+ return et4Conf.createDefaultEdgeProperty();
}
+ }
- if (edgeProperty == null) {
- edgeProperty =
- new EdgeProperty(dataMovementType,
- DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- new OutputDescriptor(logicalOutputClass.getName()),
- new InputDescriptor(logicalInputClass.getName()));
+ /**
+ * Utility method to create a stripped down configuration for the MR partitioner.
+ *
+ * @param partitionerClassName
+ * the real MR partitioner class name
+ * @param baseConf
+ * a base configuration to extract relevant properties
+ * @return
+ */
+ private Configuration createPartitionerConf(String partitionerClassName,
+ Configuration baseConf) {
+ Configuration partitionerConf = new Configuration(false);
+ partitionerConf.set("mapred.partitioner.class", partitionerClassName);
+ if (baseConf.get("mapreduce.totalorderpartitioner.path") != null) {
+ partitionerConf.set("mapreduce.totalorderpartitioner.path",
+ baseConf.get("mapreduce.totalorderpartitioner.path"));
}
-
- return edgeProperty;
+ return partitionerConf;
}
/*
@@ -423,9 +409,6 @@ private Vertex createVertex(JobConf conf, MapWork mapWork,
// create the directories FileSinkOperators need
Utilities.createTmpDirs(conf, mapWork);
- // Tez ask us to call this even if there's no preceding vertex
- MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null);
-
// finally create the vertex
Vertex map = null;
@@ -487,7 +470,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork,
Map environment = new HashMap();
MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
map.setTaskEnvironment(environment);
- map.setJavaOpts(getContainerJavaOpts(conf));
+ map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
assert mapWork.getAliasToWork().keySet().size() == 1;
@@ -500,9 +483,9 @@ private Vertex createVertex(JobConf conf, MapWork mapWork,
} else {
mrInput = MRHelpers.createMRInputPayload(serializedConf, null);
}
- map.addInput(alias,
+ map.addDataSource(alias,
new InputDescriptor(MRInputLegacy.class.getName()).
- setUserPayload(mrInput), amSplitGeneratorClass);
+ setUserPayload(mrInput), new InputInitializerDescriptor(amSplitGeneratorClass.getName()).setUserPayload(mrInput));
Map localResources = new HashMap();
localResources.put(getBaseName(appJarLr), appJarLr);
@@ -517,7 +500,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork,
localResources);
}
- map.setTaskLocalResources(localResources);
+ map.setTaskLocalFiles(localResources);
return map;
}
@@ -527,6 +510,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork,
private JobConf initializeVertexConf(JobConf baseConf, ReduceWork reduceWork) {
JobConf conf = new JobConf(baseConf);
+ // Is this required ?
conf.set("mapred.reducer.class", ExecReducer.class.getName());
boolean useSpeculativeExecReducers = HiveConf.getBoolVar(conf,
@@ -550,9 +534,6 @@ private Vertex createVertex(JobConf conf, ReduceWork reduceWork,
// create the directories FileSinkOperators need
Utilities.createTmpDirs(conf, reduceWork);
- // Call once here, will be updated when we find edges
- MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null);
-
// create the vertex
Vertex reducer = new Vertex(reduceWork.getName(),
new ProcessorDescriptor(ReduceTezProcessor.class.getName()).
@@ -565,14 +546,14 @@ private Vertex createVertex(JobConf conf, ReduceWork reduceWork,
MRHelpers.updateEnvironmentForMRTasks(conf, environment, false);
reducer.setTaskEnvironment(environment);
- reducer.setJavaOpts(getContainerJavaOpts(conf));
+ reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
Map localResources = new HashMap();
localResources.put(getBaseName(appJarLr), appJarLr);
for (LocalResource lr: additionalLr) {
localResources.put(getBaseName(lr), lr);
}
- reducer.setTaskLocalResources(localResources);
+ reducer.setTaskLocalFiles(localResources);
return reducer;
}
@@ -606,15 +587,13 @@ private LocalResource createLocalResource(FileSystem remoteFs, Path file,
}
/**
- * @param sessionConfig session configuration
* @param numContainers number of containers to pre-warm
* @param localResources additional resources to pre-warm with
* @return prewarm context object
*/
- public PreWarmContext createPreWarmContext(TezSessionConfiguration sessionConfig, int numContainers,
- Map localResources) throws IOException, TezException {
-
- Configuration conf = sessionConfig.getTezConfiguration();
+ public PreWarmContext createPreWarmContext(TezConfiguration conf,
+ int numContainers, Map localResources) throws
+ IOException, TezException {
ProcessorDescriptor prewarmProcDescriptor = new ProcessorDescriptor(HivePreWarmProcessor.class.getName());
prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf));
@@ -624,7 +603,6 @@ public PreWarmContext createPreWarmContext(TezSessionConfiguration sessionConfig
Map combinedResources = new HashMap();
- combinedResources.putAll(sessionConfig.getSessionResources());
if (localResources != null) {
combinedResources.putAll(localResources);
}
@@ -961,9 +939,9 @@ public Vertex createVertex(JobConf conf, BaseWork work,
// final vertices need to have at least one output
if (!hasChildren) {
- v.addOutput("out_"+work.getName(),
+ v.addDataSink("out_"+work.getName(),
new OutputDescriptor(MROutput.class.getName())
- .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)));
+ .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), null);
}
return v;
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index dc99702..75ac3bc 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -44,10 +44,12 @@
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.RootInputSpecUpdate;
import org.apache.tez.runtime.api.TezRootInputInitializer;
import org.apache.tez.runtime.api.TezRootInputInitializerContext;
import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.events.RootInputInitializerEvent;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
@@ -170,13 +172,16 @@ private InputSplitInfoMem generateGroupedSplits(TezRootInputInitializerContext c
return new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, null, jobConf);
}
+ public void handleInputInitializerEvent(List events) throws Exception {
+ }
+
private List createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) {
List events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1);
- RootInputConfigureVertexTasksEvent configureVertexEvent =
- new RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(),
- inputSplitInfo.getTaskLocationHints());
+ RootInputConfigureVertexTasksEvent configureVertexEvent = new
+ RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(),
+ inputSplitInfo.getTaskLocationHints(), RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
events.add(configureVertexEvent);
if (sendSerializedEvents) {
@@ -196,4 +201,5 @@ private InputSplitInfoMem generateGroupedSplits(TezRootInputInitializerContext c
}
return events;
}
+
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
index 318ba8e..d95530b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
@@ -22,8 +22,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
-
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
/**
* ObjectCache. Tez implementation based on the tez object registry.
@@ -32,7 +31,7 @@
public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName());
- private final ObjectRegistry registry = ObjectRegistryFactory.getObjectRegistry();
+ private final ObjectRegistry registry = new ObjectRegistryImpl();
@Override
public void cache(String key, Object value) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 2959fcc..c9484e8 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -50,10 +50,8 @@
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.AMConfiguration;
import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.client.TezSession;
-import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
@@ -70,7 +68,7 @@
private HiveConf conf;
private Path tezScratchDir;
private LocalResource appJarLr;
- private TezSession session;
+ private TezClient session;
private String sessionId;
private DagUtils utils;
private String queueName;
@@ -153,11 +151,6 @@ public void open(HiveConf conf, String[] additionalFiles)
refreshLocalResourcesFromConf(conf);
- // generate basic tez config
- TezConfiguration tezConfig = new TezConfiguration(conf);
-
- tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString());
-
// unless already installed on all the cluster nodes, we'll have to
// localize hive-exec.jar as well.
appJarLr = createJarLocalResource(utils.getExecJarPathLocal());
@@ -173,13 +166,12 @@ public void open(HiveConf conf, String[] additionalFiles)
Map amEnv = new HashMap();
MRHelpers.updateEnvironmentForMRAM(conf, amEnv);
- AMConfiguration amConfig = new AMConfiguration(amEnv, commonLocalResources, tezConfig, null);
-
- // configuration for the session
- TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConfig);
-
// and finally we're ready to create and start the session
- session = new TezSession("HIVE-" + sessionId, sessionConfig);
+ // generate basic tez config
+ TezConfiguration tezConfig = new TezConfiguration(conf);
+ tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString());
+ session = new TezClient("HIVE-" + sessionId, tezConfig, true,
+ commonLocalResources, null);
LOG.info("Opening new Tez Session (id: " + sessionId
+ ", scratch dir: " + tezScratchDir + ")");
@@ -190,7 +182,8 @@ public void open(HiveConf conf, String[] additionalFiles)
int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
LOG.info("Prewarming " + n + " containers (id: " + sessionId
+ ", scratch dir: " + tezScratchDir + ")");
- PreWarmContext context = utils.createPreWarmContext(sessionConfig, n, commonLocalResources);
+ PreWarmContext context = utils.createPreWarmContext(tezConfig, n,
+ commonLocalResources);
try {
session.preWarm(context);
} catch (InterruptedException ie) {
@@ -199,11 +192,16 @@ public void open(HiveConf conf, String[] additionalFiles)
}
}
}
-
+ try {
+ session.waitTillReady();
+ } catch(InterruptedException ie) {
+ //ignore
+ }
// In case we need to run some MR jobs, we'll run them under tez MR emulation. The session
// id is used for tez to reuse the current session rather than start a new one.
conf.set("mapreduce.framework.name", "yarn-tez");
- conf.set("mapreduce.tez.session.tokill-application-id", session.getApplicationId().toString());
+ conf.set("mapreduce.tez.session.tokill-application-id",
+ session.getAppMasterApplicationId().toString());
openSessions.add(this);
}
@@ -280,7 +278,7 @@ public String getSessionId() {
return sessionId;
}
- public TezSession getSession() {
+ public TezClient getSession() {
return session;
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 951e918..e1f5630 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -29,7 +29,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -247,16 +246,14 @@ DAG build(JobConf conf, TezWork work, Path scratchDir,
}
VertexGroup group = dag.createVertexGroup(w.getName(), vertexArray);
+ // For a vertex group, all Outputs use the same Key-class, Val-class and partitioner.
+ // Pick any one source vertex to figure out the Edge configuration.
+ JobConf parentConf = workToConf.get(unionWorkItems.get(0));
+
// now hook up the children
for (BaseWork v: children) {
- // need to pairwise patch up the configuration of the vertices
- for (BaseWork part: unionWorkItems) {
- utils.updateConfigurationForEdge(workToConf.get(part), workToVertex.get(part),
- workToConf.get(v), workToVertex.get(v));
- }
-
// finally we can create the grouped edge
- GroupInputEdge e = utils.createEdge(group, workToConf.get(v),
+ GroupInputEdge e = utils.createEdge(group, parentConf,
workToVertex.get(v), work.getEdgeProperty(w, v));
dag.addEdge(e);
@@ -279,7 +276,7 @@ DAG build(JobConf conf, TezWork work, Path scratchDir,
TezEdgeProperty edgeProp = work.getEdgeProperty(w, v);
- e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeProp);
+ e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp);
dag.addEdge(e);
}
}
@@ -305,7 +302,8 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
try {
// ready to start execution on the cluster
- dagClient = sessionState.getSession().submitDAG(dag, resourceMap);
+ sessionState.getSession().addAppMasterLocalResources(resourceMap);
+ dagClient = sessionState.getSession().submitDAG(dag);
} catch (SessionNotRunning nr) {
console.printInfo("Tez session was closed. Reopening...");
@@ -313,7 +311,7 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf);
console.printInfo("Session re-established.");
- dagClient = sessionState.getSession().submitDAG(dag, resourceMap);
+ dagClient = sessionState.getSession().submitDAG(dag);
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index 0835bde..84c031c 100644
--- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -54,7 +54,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
@@ -75,7 +75,7 @@
ReduceWork[] rws;
TezWork work;
TezTask task;
- TezSession session;
+ TezClient session;
TezSessionState sessionState;
JobConf conf;
LocalResource appLr;
@@ -102,13 +102,13 @@ public Vertex answer(InvocationOnMock invocation) throws Throwable {
}
});
- when(utils.createEdge(any(JobConf.class), any(Vertex.class), any(JobConf.class),
+ when(utils.createEdge(any(JobConf.class), any(Vertex.class),
any(Vertex.class), any(TezEdgeProperty.class))).thenAnswer(new Answer() {
@Override
public Edge answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
- return new Edge((Vertex)args[1], (Vertex)args[3], mock(EdgeProperty.class));
+ return new Edge((Vertex)args[1], (Vertex)args[2], mock(EdgeProperty.class));
}
});
@@ -158,10 +158,10 @@ public Edge answer(InvocationOnMock invocation) throws Throwable {
appLr = mock(LocalResource.class);
SessionState.start(new HiveConf());
- session = mock(TezSession.class);
+ session = mock(TezClient.class);
sessionState = mock(TezSessionState.class);
when(sessionState.getSession()).thenReturn(session);
- when(session.submitDAG(any(DAG.class), any(Map.class)))
+ when(session.submitDAG(any(DAG.class)))
.thenThrow(new SessionNotRunning(""))
.thenReturn(mock(DAGClient.class));
}
@@ -186,7 +186,7 @@ public void testBuildDag() throws IllegalArgumentException, IOException, Excepti
for (BaseWork x: work.getChildren(w)) {
boolean found = false;
for (Vertex u: outs) {
- if (u.getVertexName().equals(x.getName())) {
+ if (u.getName().equals(x.getName())) {
found = true;
break;
}
@@ -209,7 +209,7 @@ public void testSubmit() throws Exception {
// validate close/reopen
verify(sessionState, times(1)).open(any(HiveConf.class));
verify(sessionState, times(1)).close(eq(false)); // now uses pool after HIVE-7043
- verify(session, times(2)).submitDAG(any(DAG.class), any(Map.class));
+ verify(session, times(2)).submitDAG(any(DAG.class));
}
@Test