diff --git pom.xml pom.xml
index 457a178..811da32 100644
--- pom.xml
+++ pom.xml
@@ -144,7 +144,7 @@
1.0.1
1.7.5
4.0.4
- 0.5.0-incubating-SNAPSHOT
+ 0.5.0-SNAPSHOT
1.1
0.2
1.4
diff --git ql/pom.xml ql/pom.xml
index 0729d47..c3e0adb 100644
--- ql/pom.xml
+++ ql/pom.xml
@@ -297,6 +297,38 @@
org.apache.tez
+ tez-runtime-internals
+ ${tez.version}
+ true
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-jobclient
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+
+
+
+
+ org.apache.tez
tez-mapreduce
${tez.version}
true
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
index 399774c..69324a8 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
@@ -31,6 +31,8 @@
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import com.google.common.collect.Multimap;
+
public class CustomPartitionEdge extends EdgeManager {
private static final Log LOG = LogFactory.getLog(CustomPartitionEdge.class.getName());
@@ -39,9 +41,11 @@
EdgeManagerContext context = null;
// used by the framework at runtime. initialize is the real initializer at runtime
- public CustomPartitionEdge() {
+ public CustomPartitionEdge(EdgeManagerContext context) {
+ super(context);
}
+
@Override
public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
return context.getSourceVertexNumTasks();
@@ -59,8 +63,7 @@ public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
// called at runtime to initialize the custom edge.
@Override
- public void initialize(EdgeManagerContext context) {
- this.context = context;
+ public void initialize() {
byte[] payload = context.getUserPayload();
LOG.info("Initializing the edge, payload: " + payload);
if (payload == null) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
index 0aa80f0..407d8ac 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
@@ -82,12 +82,13 @@
private final SplitGrouper grouper = new SplitGrouper();
private int taskCount = 0;
- public CustomPartitionVertex() {
+ public CustomPartitionVertex(VertexManagerPluginContext context) {
+ super(context);
}
@Override
- public void initialize(VertexManagerPluginContext context) {
- this.context = context;
+ public void initialize() {
+ this.context = getContext();
ByteBuffer byteBuf = ByteBuffer.wrap(context.getUserPayload());
this.numBuckets = byteBuf.getInt();
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 209686d..d66e2ea 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -80,19 +80,20 @@
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.common.TezJobConfig;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
+import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
@@ -103,6 +104,7 @@
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfigurer;
import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer;
@@ -297,8 +299,8 @@ public Edge createEdge(JobConf vConf, Vertex v, Vertex w,
private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf)
throws IOException {
MRHelpers.translateVertexConfToTez(conf);
- String keyClass = conf.get(TezJobConfig.TEZ_RUNTIME_KEY_CLASS);
- String valClass = conf.get(TezJobConfig.TEZ_RUNTIME_VALUE_CLASS);
+ String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
+ String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
String partitionerClassName = conf.get("mapred.partitioner.class");
Configuration partitionerConf;
@@ -488,10 +490,9 @@ private Vertex createVertex(JobConf conf, MapWork mapWork,
} else {
mrInput = MRHelpers.createMRInputPayload(serializedConf, null);
}
-
- map.addInput(alias,
+ map.addDataSource(alias,
new InputDescriptor(MRInputLegacy.class.getName()).
- setUserPayload(mrInput), amSplitGeneratorClass);
+ setUserPayload(mrInput), new InputInitializerDescriptor(amSplitGeneratorClass.getName()).setUserPayload(mrInput));
Map localResources = new HashMap();
localResources.put(getBaseName(appJarLr), appJarLr);
@@ -501,7 +502,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork,
if (inputSplitInfo != null) {
// only relevant for client-side split generation
- map.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+ map.setLocationHint(new VertexLocationHint(inputSplitInfo.getTaskLocationHints()));
MRHelpers.updateLocalResourcesForInputSplits(FileSystem.get(conf), inputSplitInfo,
localResources);
}
@@ -945,9 +946,9 @@ public Vertex createVertex(JobConf conf, BaseWork work,
// final vertices need to have at least one output
if (!hasChildren) {
- v.addOutput("out_"+work.getName(),
+ v.addDataSink("out_"+work.getName(),
new OutputDescriptor(MROutput.class.getName())
- .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)));
+ .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), null);
}
return v;
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java
index c756e72..7a42b93 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
@@ -48,7 +49,7 @@
*
* @see Config for configuring the HivePreWarmProcessor
*/
-public class HivePreWarmProcessor implements LogicalIOProcessor {
+public class HivePreWarmProcessor extends AbstractLogicalIOProcessor {
private static boolean prewarmed = false;
@@ -56,9 +57,13 @@
private Configuration conf;
+ public HivePreWarmProcessor(TezProcessorContext context) {
+ super(context);
+ }
+
@Override
- public void initialize(TezProcessorContext processorContext)
- throws Exception {
+ public void initialize() throws Exception {
+ TezProcessorContext processorContext = getContext();
byte[] userPayload = processorContext.getUserPayload();
this.conf = TezUtils.createConfFromUserPayload(userPayload);
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index 8c6e6b7..857d48b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -62,14 +62,19 @@
* making sure that splits from different partitions are only grouped if they
* are of the same schema, format and serde
*/
-public class HiveSplitGenerator implements TezRootInputInitializer {
+public class HiveSplitGenerator extends TezRootInputInitializer {
private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class);
private static final SplitGrouper grouper = new SplitGrouper();
+ public HiveSplitGenerator(TezRootInputInitializerContext initializerContext) {
+ super(initializerContext);
+ }
+
@Override
- public List initialize(TezRootInputInitializerContext rootInputContext) throws Exception {
+ public List initialize() throws Exception {
+ TezRootInputInitializerContext rootInputContext = getContext();
MRInputUserPayloadProto userPayloadProto =
MRHelpers.parseMRInputPayload(rootInputContext.getUserPayload());
@@ -175,6 +180,9 @@
return groupedSplits;
}
+ public void handleInputInitializerEvent(List events) throws Exception {
+ }
+
private List createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) {
List events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1);
@@ -202,7 +210,4 @@
}
return events;
}
-
- public void handleInputInitializerEvent(List events) throws Exception {
- }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
index 31f3bcd..e388a0d 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
@@ -17,11 +17,15 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
+import org.apache.tez.runtime.api.TezProcessorContext;
+
/**
* Subclass that is used to indicate if this is a map or reduce process
*/
public class MapTezProcessor extends TezProcessor {
- public MapTezProcessor(){
- super(true);
+
+ public MapTezProcessor(TezProcessorContext context) {
+ super(context);
+ this.isMap = true;
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
index 318ba8e..d95530b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
@@ -22,8 +22,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
-
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
/**
* ObjectCache. Tez implementation based on the tez object registry.
@@ -32,7 +31,7 @@
public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName());
- private final ObjectRegistry registry = ObjectRegistryFactory.getObjectRegistry();
+ private final ObjectRegistry registry = new ObjectRegistryImpl();
@Override
public void cache(String key, Object value) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java
index 7152aae..c79444e 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java
@@ -17,11 +17,15 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
+import org.apache.tez.runtime.api.TezProcessorContext;
+
/**
* Subclass that is used to indicate if this is a map or reduce process
*/
public class ReduceTezProcessor extends TezProcessor {
- public ReduceTezProcessor(){
- super(false);
+
+ public ReduceTezProcessor(TezProcessorContext context) {
+ super(context);
+ this.isMap = false;
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index 6839e34..ea3770d 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -33,6 +33,7 @@
import org.apache.tez.common.TezUtils;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
@@ -44,12 +45,12 @@
* Hive processor for Tez that forms the vertices in Tez and processes the data.
* Does what ExecMapper and ExecReducer does for hive in MR framework.
*/
-public class TezProcessor implements LogicalIOProcessor {
+public class TezProcessor extends AbstractLogicalIOProcessor {
private static final Log LOG = LogFactory.getLog(TezProcessor.class);
- private boolean isMap = false;
+ protected boolean isMap = false;
RecordProcessor rproc = null;
@@ -69,8 +70,8 @@
jobIdFormat.setMinimumIntegerDigits(4);
}
- public TezProcessor(boolean isMap) {
- this.isMap = isMap;
+ public TezProcessor(TezProcessorContext context) {
+ super(context);
}
@Override
@@ -86,8 +87,8 @@ public void handleEvents(List arg0) {
}
@Override
- public void initialize(TezProcessorContext processorContext)
- throws IOException {
+ public void initialize() throws IOException {
+ TezProcessorContext processorContext = getContext();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
this.processorContext = processorContext;
//get the jobconf
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 914bfaa..c9484e8 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -192,8 +192,11 @@ public void open(HiveConf conf, String[] additionalFiles)
}
}
}
-
- session.waitTillReady();
+ try {
+ session.waitTillReady();
+ } catch(InterruptedException ie) {
+ //ignore
+ }
// In case we need to run some MR jobs, we'll run them under tez MR emulation. The session
// id is used for tez to reuse the current session rather than start a new one.
conf.set("mapreduce.framework.name", "yarn-tez");
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
index cc4477f..d9139b8 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
@@ -18,11 +18,13 @@
package org.apache.hadoop.hive.ql.exec.tez.tools;
import java.util.IdentityHashMap;
+import java.util.List;
import java.util.Map;
import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.MergedLogicalInput;
import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezMergedInputContext;
/**
* TezMergedLogicalInput is an adapter to make union input look like
@@ -31,7 +33,11 @@
public class TezMergedLogicalInput extends MergedLogicalInput {
private Map readyInputs = new IdentityHashMap();
-
+
+ public TezMergedLogicalInput(TezMergedInputContext context, List inputs) {
+ super(context, inputs);
+ }
+
@Override
public Reader getReader() throws Exception {
return new InputMerger(getInputs());