diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index d66e2ea..54e3f77 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -81,6 +81,8 @@ import org.apache.hadoop.yarn.util.Records; import org.apache.tez.client.PreWarmContext; import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.DataSinkDescriptor; +import org.apache.tez.dag.api.DataSourceDescriptor; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.EdgeManagerDescriptor; import org.apache.tez.dag.api.EdgeProperty; @@ -302,7 +304,7 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS); String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS); String partitionerClassName = conf.get("mapred.partitioner.class"); - Configuration partitionerConf; + Map partitionerConf; EdgeType edgeType = edgeProp.getEdgeType(); switch (edgeType) { @@ -352,12 +354,12 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration * a base configuration to extract relevant properties * @return */ - private Configuration createPartitionerConf(String partitionerClassName, + private Map createPartitionerConf(String partitionerClassName, Configuration baseConf) { - Configuration partitionerConf = new Configuration(false); - partitionerConf.set("mapred.partitioner.class", partitionerClassName); + Map partitionerConf = new HashMap(); + partitionerConf.put("mapred.partitioner.class", partitionerClassName); if (baseConf.get("mapreduce.totalorderpartitioner.path") != null) { - partitionerConf.set("mapreduce.totalorderpartitioner.path", + partitionerConf.put("mapreduce.totalorderpartitioner.path", baseConf.get("mapreduce.totalorderpartitioner.path")); } return partitionerConf; @@ -491,8 +493,8 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, mrInput = MRHelpers.createMRInputPayload(serializedConf, null); } map.addDataSource(alias, - new InputDescriptor(MRInputLegacy.class.getName()). - setUserPayload(mrInput), new InputInitializerDescriptor(amSplitGeneratorClass.getName()).setUserPayload(mrInput)); + new DataSourceDescriptor(new InputDescriptor(MRInputLegacy.class.getName()). + setUserPayload(mrInput), new InputInitializerDescriptor(amSplitGeneratorClass.getName()).setUserPayload(mrInput),null)); Map localResources = new HashMap(); localResources.put(getBaseName(appJarLr), appJarLr); @@ -946,9 +948,9 @@ public Vertex createVertex(JobConf conf, BaseWork work, // final vertices need to have at least one output if (!hasChildren) { - v.addDataSink("out_"+work.getName(), + v.addDataSink("out_"+work.getName(), new DataSinkDescriptor( new OutputDescriptor(MROutput.class.getName()) - .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), null); + .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), null, null)); } return v; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java index d95530b..9ebe406 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java @@ -20,7 +20,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle; import org.apache.tez.runtime.common.objectregistry.ObjectRegistry; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; @@ -36,7 +35,7 @@ @Override public void cache(String key, Object value) { LOG.info("Adding " + key + " to cache with value " + value); - registry.add(ObjectLifeCycle.VERTEX, key, value); + registry.cacheForVertex(key, value); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java index 726e122..a977319 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java @@ -37,7 +37,7 @@ * Uses a priority queue to pick the KeyValuesReader of the input that is next in * sort order. */ -public class InputMerger implements KeyValuesReader { +public class InputMerger extends KeyValuesReader { public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); private PriorityQueue pQueue = null;