diff --git pom.xml pom.xml index 457a178..811da32 100644 --- pom.xml +++ pom.xml @@ -144,7 +144,7 @@ 1.0.1 1.7.5 4.0.4 - 0.5.0-incubating-SNAPSHOT + 0.5.0-SNAPSHOT 1.1 0.2 1.4 diff --git ql/pom.xml ql/pom.xml index 0729d47..c3e0adb 100644 --- ql/pom.xml +++ ql/pom.xml @@ -297,6 +297,38 @@ org.apache.tez + tez-runtime-internals + ${tez.version} + true + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + + + org.apache.hadoop + hadoop-mapreduce-client-common + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.hadoop + hadoop-yarn-client + + + + + org.apache.tez tez-mapreduce ${tez.version} true diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java index 399774c..69324a8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java @@ -31,6 +31,8 @@ import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputReadErrorEvent; +import com.google.common.collect.Multimap; + public class CustomPartitionEdge extends EdgeManager { private static final Log LOG = LogFactory.getLog(CustomPartitionEdge.class.getName()); @@ -39,9 +41,11 @@ EdgeManagerContext context = null; // used by the framework at runtime. initialize is the real initializer at runtime - public CustomPartitionEdge() { + public CustomPartitionEdge(EdgeManagerContext context) { + super(context); } + @Override public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) { return context.getSourceVertexNumTasks(); @@ -59,8 +63,7 @@ public int getNumDestinationConsumerTasks(int sourceTaskIndex) { // called at runtime to initialize the custom edge. @Override - public void initialize(EdgeManagerContext context) { - this.context = context; + public void initialize() { byte[] payload = context.getUserPayload(); LOG.info("Initializing the edge, payload: " + payload); if (payload == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index 0aa80f0..407d8ac 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -82,12 +82,13 @@ private final SplitGrouper grouper = new SplitGrouper(); private int taskCount = 0; - public CustomPartitionVertex() { + public CustomPartitionVertex(VertexManagerPluginContext context) { + super(context); } @Override - public void initialize(VertexManagerPluginContext context) { - this.context = context; + public void initialize() { + this.context = getContext(); ByteBuffer byteBuf = ByteBuffer.wrap(context.getUserPayload()); this.numBuckets = byteBuf.getInt(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 209686d..d66e2ea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -80,19 +80,20 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.tez.client.PreWarmContext; -import org.apache.tez.common.TezJobConfig; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.EdgeManagerDescriptor; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; +import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator; @@ -103,6 +104,7 @@ import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.partition.MRPartitioner; import org.apache.tez.runtime.api.TezRootInputInitializer; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer; import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfigurer; import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer; @@ -297,8 +299,8 @@ public Edge createEdge(JobConf vConf, Vertex v, Vertex w, private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf) throws IOException { MRHelpers.translateVertexConfToTez(conf); - String keyClass = conf.get(TezJobConfig.TEZ_RUNTIME_KEY_CLASS); - String valClass = conf.get(TezJobConfig.TEZ_RUNTIME_VALUE_CLASS); + String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS); + String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS); String partitionerClassName = conf.get("mapred.partitioner.class"); Configuration partitionerConf; @@ -488,10 +490,9 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, } else { mrInput = MRHelpers.createMRInputPayload(serializedConf, null); } - - map.addInput(alias, + map.addDataSource(alias, new InputDescriptor(MRInputLegacy.class.getName()). - setUserPayload(mrInput), amSplitGeneratorClass); + setUserPayload(mrInput), new InputInitializerDescriptor(amSplitGeneratorClass.getName()).setUserPayload(mrInput)); Map localResources = new HashMap(); localResources.put(getBaseName(appJarLr), appJarLr); @@ -501,7 +502,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, if (inputSplitInfo != null) { // only relevant for client-side split generation - map.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints()); + map.setLocationHint(new VertexLocationHint(inputSplitInfo.getTaskLocationHints())); MRHelpers.updateLocalResourcesForInputSplits(FileSystem.get(conf), inputSplitInfo, localResources); } @@ -945,9 +946,9 @@ public Vertex createVertex(JobConf conf, BaseWork work, // final vertices need to have at least one output if (!hasChildren) { - v.addOutput("out_"+work.getName(), + v.addDataSink("out_"+work.getName(), new OutputDescriptor(MROutput.class.getName()) - .setUserPayload(MRHelpers.createUserPayloadFromConf(conf))); + .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), null); } return v; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java index c756e72..7a42b93 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java @@ -25,6 +25,7 @@ import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.tez.common.TezUtils; +import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalIOProcessor; import org.apache.tez.runtime.api.LogicalInput; @@ -48,7 +49,7 @@ * * @see Config for configuring the HivePreWarmProcessor */ -public class HivePreWarmProcessor implements LogicalIOProcessor { +public class HivePreWarmProcessor extends AbstractLogicalIOProcessor { private static boolean prewarmed = false; @@ -56,9 +57,13 @@ private Configuration conf; + public HivePreWarmProcessor(TezProcessorContext context) { + super(context); + } + @Override - public void initialize(TezProcessorContext processorContext) - throws Exception { + public void initialize() throws Exception { + TezProcessorContext processorContext = getContext(); byte[] userPayload = processorContext.getUserPayload(); this.conf = TezUtils.createConfFromUserPayload(userPayload); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index 8c6e6b7..857d48b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -62,14 +62,19 @@ * making sure that splits from different partitions are only grouped if they * are of the same schema, format and serde */ -public class HiveSplitGenerator implements TezRootInputInitializer { +public class HiveSplitGenerator extends TezRootInputInitializer { private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class); private static final SplitGrouper grouper = new SplitGrouper(); + public HiveSplitGenerator(TezRootInputInitializerContext initializerContext) { + super(initializerContext); + } + @Override - public List initialize(TezRootInputInitializerContext rootInputContext) throws Exception { + public List initialize() throws Exception { + TezRootInputInitializerContext rootInputContext = getContext(); MRInputUserPayloadProto userPayloadProto = MRHelpers.parseMRInputPayload(rootInputContext.getUserPayload()); @@ -175,6 +180,9 @@ return groupedSplits; } + public void handleInputInitializerEvent(List events) throws Exception { + } + private List createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) { List events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1); @@ -202,7 +210,4 @@ } return events; } - - public void handleInputInitializerEvent(List events) throws Exception { - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java index 31f3bcd..e388a0d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java @@ -17,11 +17,15 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import org.apache.tez.runtime.api.TezProcessorContext; + /** * Subclass that is used to indicate if this is a map or reduce process */ public class MapTezProcessor extends TezProcessor { - public MapTezProcessor(){ - super(true); + + public MapTezProcessor(TezProcessorContext context) { + super(context); + this.isMap = true; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java index 318ba8e..d95530b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java @@ -22,8 +22,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle; import org.apache.tez.runtime.common.objectregistry.ObjectRegistry; -import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory; - +import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; /** * ObjectCache. Tez implementation based on the tez object registry. @@ -32,7 +31,7 @@ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache { private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName()); - private final ObjectRegistry registry = ObjectRegistryFactory.getObjectRegistry(); + private final ObjectRegistry registry = new ObjectRegistryImpl(); @Override public void cache(String key, Object value) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java index 7152aae..c79444e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java @@ -17,11 +17,15 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import org.apache.tez.runtime.api.TezProcessorContext; + /** * Subclass that is used to indicate if this is a map or reduce process */ public class ReduceTezProcessor extends TezProcessor { - public ReduceTezProcessor(){ - super(false); + + public ReduceTezProcessor(TezProcessorContext context) { + super(context); + this.isMap = false; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 6839e34..ea3770d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -33,6 +33,7 @@ import org.apache.tez.common.TezUtils; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.processor.MRTaskReporter; +import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalIOProcessor; import org.apache.tez.runtime.api.LogicalInput; @@ -44,12 +45,12 @@ * Hive processor for Tez that forms the vertices in Tez and processes the data. * Does what ExecMapper and ExecReducer does for hive in MR framework. */ -public class TezProcessor implements LogicalIOProcessor { +public class TezProcessor extends AbstractLogicalIOProcessor { private static final Log LOG = LogFactory.getLog(TezProcessor.class); - private boolean isMap = false; + protected boolean isMap = false; RecordProcessor rproc = null; @@ -69,8 +70,8 @@ jobIdFormat.setMinimumIntegerDigits(4); } - public TezProcessor(boolean isMap) { - this.isMap = isMap; + public TezProcessor(TezProcessorContext context) { + super(context); } @Override @@ -86,8 +87,8 @@ public void handleEvents(List arg0) { } @Override - public void initialize(TezProcessorContext processorContext) - throws IOException { + public void initialize() throws IOException { + TezProcessorContext processorContext = getContext(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); this.processorContext = processorContext; //get the jobconf diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 914bfaa..c9484e8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -192,8 +192,11 @@ public void open(HiveConf conf, String[] additionalFiles) } } } - - session.waitTillReady(); + try { + session.waitTillReady(); + } catch(InterruptedException ie) { + //ignore + } // In case we need to run some MR jobs, we'll run them under tez MR emulation. The session // id is used for tez to reuse the current session rather than start a new one. conf.set("mapreduce.framework.name", "yarn-tez"); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java index cc4477f..d9139b8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java @@ -18,11 +18,13 @@ package org.apache.hadoop.hive.ql.exec.tez.tools; import java.util.IdentityHashMap; +import java.util.List; import java.util.Map; import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.MergedLogicalInput; import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.api.TezMergedInputContext; /** * TezMergedLogicalInput is an adapter to make union input look like @@ -31,7 +33,11 @@ public class TezMergedLogicalInput extends MergedLogicalInput { private Map readyInputs = new IdentityHashMap(); - + + public TezMergedLogicalInput(TezMergedInputContext context, List inputs) { + super(context, inputs); + } + @Override public Reader getReader() throws Exception { return new InputMerger(getInputs());