diff --git pom.xml pom.xml index b3216e1..157cb0d 100644 --- pom.xml +++ pom.xml @@ -146,7 +146,7 @@ 1.0.1 1.7.5 4.0.4 - 0.4.0-incubating + 0.5.0-SNAPSHOT 1.1 0.2 1.4 diff --git ql/pom.xml ql/pom.xml index 0729d47..c3e0adb 100644 --- ql/pom.xml +++ ql/pom.xml @@ -297,6 +297,38 @@ org.apache.tez + tez-runtime-internals + ${tez.version} + true + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + + + org.apache.hadoop + hadoop-mapreduce-client-common + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.hadoop + hadoop-yarn-client + + + + + org.apache.tez tez-mapreduce ${tez.version} true diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java index 74c5429..397cd81 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java @@ -33,36 +33,36 @@ import com.google.common.collect.Multimap; -public class CustomPartitionEdge implements EdgeManager { +public class CustomPartitionEdge extends EdgeManager { private static final Log LOG = LogFactory.getLog(CustomPartitionEdge.class.getName()); CustomEdgeConfiguration conf = null; + EdgeManagerContext context = null; // used by the framework at runtime. initialize is the real initializer at runtime public CustomPartitionEdge() { } @Override - public int getNumDestinationTaskPhysicalInputs(int numSourceTasks, - int destinationTaskIndex) { + public int getNumDestinationTaskPhysicalInputs(int numSourceTasks) { return numSourceTasks; } @Override - public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, - int sourceTaskIndex) { + public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks) { return conf.getNumBuckets(); } @Override - public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestinationTasks) { - return numDestinationTasks; + public int getNumDestinationConsumerTasks(int sourceTaskIndex) { + return sourceTaskIndex; } // called at runtime to initialize the custom edge. @Override public void initialize(EdgeManagerContext context) { + this.context = context; byte[] payload = context.getUserPayload(); LOG.info("Initializing the edge, payload: " + payload); if (payload == null) { @@ -91,16 +91,16 @@ public void routeDataMovementEventToDestination(DataMovementEvent event, } @Override - public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex, - int numDestinationTasks, Map> mapDestTaskIndices) { + public void routeInputSourceTaskFailedEventToDestination(int + sourceTaskIndex, Map> mapDestTaskIndices) { List destTaskIndices = new ArrayList(); - addAllDestinationTaskIndices(numDestinationTasks, destTaskIndices); + addAllDestinationTaskIndices(context.getDestinationVertexNumTasks(), destTaskIndices); mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices); } @Override public int routeInputErrorEventToSource(InputReadErrorEvent event, - int destinationTaskIndex) { + int destinationTaskIndex, int destinationFailedInputIndex) { return event.getIndex(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index cecedfb..06cbd6b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -64,7 +64,7 @@ * Only works with old mapred API * Will only work with a single MRInput for now. */ -public class CustomPartitionVertex implements VertexManagerPlugin { +public class CustomPartitionVertex extends VertexManagerPlugin { private static final Log LOG = LogFactory.getLog(CustomPartitionVertex.class.getName()); @@ -90,9 +90,9 @@ public void initialize(VertexManagerPluginContext context) { @Override public void onVertexStarted(Map> completions) { int numTasks = context.getVertexNumTasks(context.getVertexName()); - List scheduledTasks = new ArrayList(numTasks); + List scheduledTasks = new ArrayList(numTasks); for (int i = 0; i < numTasks; ++i) { - scheduledTasks.add(new Integer(i)); + scheduledTasks.add(new VertexManagerPluginContext.TaskWithLocationHint(new Integer(i), null)); } context.scheduleVertexTasks(scheduledTasks); } @@ -195,7 +195,7 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr getBucketSplitMapForPath(pathFileSplitsMap); try { - int totalResource = context.getTotalAVailableResource().getMemory(); + int totalResource = context.getTotalAvailableResource().getMemory(); int taskResource = context.getVertexTaskResource().getMemory(); float waves = conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES, @@ -267,7 +267,7 @@ private void processAllEvents(String inputName, context.setVertexParallelism( taskCount, new VertexLocationHint(grouper.createTaskLocationHints(finalSplits - .toArray(new InputSplit[finalSplits.size()]))), emMap); + .toArray(new InputSplit[finalSplits.size()]))), emMap, null); // Set the actual events for the tasks. context.addRootInputEvents(inputName, taskEvents); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index e116426..6219ae4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -80,37 +80,32 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.tez.client.PreWarmContext; -import org.apache.tez.client.TezSessionConfiguration; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.EdgeManagerDescriptor; import org.apache.tez.dag.api.EdgeProperty; -import org.apache.tez.dag.api.EdgeProperty.DataMovementType; -import org.apache.tez.dag.api.EdgeProperty.DataSourceType; -import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; -import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; -import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.partition.MRPartitioner; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer; +import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfigurer; +import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer; import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput; -import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy; -import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput; -import org.apache.tez.runtime.library.output.OnFileSortedOutput; -import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput; -import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput; import com.google.common.base.Function; import com.google.common.collect.Iterators; @@ -162,6 +157,7 @@ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { JobConf conf = new JobConf(baseConf); if (mapWork.getNumMapTasks() != null) { + // Is this required ? conf.setInt(MRJobConfig.NUM_MAPS, mapWork.getNumMapTasks().intValue()); } @@ -200,6 +196,7 @@ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { inpFormat = CombineHiveInputFormat.class.getName(); } + // Is this required ? conf.set("mapred.mapper.class", ExecMapper.class.getName()); conf.set("mapred.input.format.class", inpFormat); @@ -211,20 +208,19 @@ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { * Edge between them. * * @param group The parent VertexGroup - * @param wConf The job conf of the child vertex + * @param vConf The job conf of one of the parrent (grouped) vertices * @param w The child vertex * @param edgeProp the edge property of connection between the two * endpoints. */ @SuppressWarnings("rawtypes") - public GroupInputEdge createEdge(VertexGroup group, JobConf wConf, + public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w, TezEdgeProperty edgeProp) throws IOException { Class mergeInputClass; - LOG.info("Creating Edge between " + group.getGroupName() + " and " + w.getVertexName()); - w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf)); + LOG.info("Creating Edge between " + group.getGroupName() + " and " + w.getName()); EdgeType edgeType = edgeProp.getEdgeType(); switch (edgeType) { @@ -255,40 +251,23 @@ public GroupInputEdge createEdge(VertexGroup group, JobConf wConf, break; } - return new GroupInputEdge(group, w, createEdgeProperty(edgeProp), + return new GroupInputEdge(group, w, createEdgeProperty(edgeProp, vConf), new InputDescriptor(mergeInputClass.getName())); } /** - * Given two vertices a, b update their configurations to be used in an Edge a-b - */ - public void updateConfigurationForEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w) - throws IOException { - - // Tez needs to setup output subsequent input pairs correctly - MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf); - - // update payloads (configuration for the vertices might have changed) - v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf)); - w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf)); - } - - /** - * Given two vertices and their respective configuration objects createEdge + * Given two vertices and the configuration for the source vertex, createEdge * will create an Edge object that connects the two. * - * @param vConf JobConf of the first vertex + * @param vConf JobConf of the first (source) vertex * @param v The first vertex (source) - * @param wConf JobConf of the second vertex * @param w The second vertex (sink) * @return */ - public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w, + public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgeProp) throws IOException { - updateConfigurationForEdge(vConf, v, wConf, w); - switch(edgeProp.getEdgeType()) { case CUSTOM_EDGE: { int numBuckets = edgeProp.getNumBuckets(); @@ -307,31 +286,33 @@ public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w, // nothing } - return new Edge(v, w, createEdgeProperty(edgeProp)); + return new Edge(v, w, createEdgeProperty(edgeProp, vConf)); } /* * Helper function to create an edge property from an edge type. */ @SuppressWarnings("rawtypes") - private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOException { - DataMovementType dataMovementType; - Class logicalInputClass; - Class logicalOutputClass; + private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf) + throws IOException { + MRHelpers.translateVertexConfToTez(conf); + String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS); + String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS); + String partitionerClassName = conf.get("mapred.partitioner.class"); + Configuration partitionerConf; - EdgeProperty edgeProperty = null; EdgeType edgeType = edgeProp.getEdgeType(); switch (edgeType) { case BROADCAST_EDGE: - dataMovementType = DataMovementType.BROADCAST; - logicalOutputClass = OnFileUnorderedKVOutput.class; - logicalInputClass = ShuffledUnorderedKVInput.class; - break; - + UnorderedUnpartitionedKVEdgeConfigurer et1Conf = UnorderedUnpartitionedKVEdgeConfigurer + .newBuilder(keyClass, valClass).setFromConfiguration(conf).build(); + return et1Conf.createDefaultBroadcastEdgeProperty(); case CUSTOM_EDGE: - dataMovementType = DataMovementType.CUSTOM; - logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class; - logicalInputClass = ShuffledUnorderedKVInput.class; + assert partitionerClassName != null; + partitionerConf = createPartitionerConf(partitionerClassName, conf); + UnorderedPartitionedKVEdgeConfigurer et2Conf = UnorderedPartitionedKVEdgeConfigurer + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf).build(); EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(CustomPartitionEdge.class.getName()); CustomEdgeConfiguration edgeConf = @@ -340,38 +321,43 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOExcep edgeConf.write(dob); byte[] userPayload = dob.getData(); edgeDesc.setUserPayload(userPayload); - edgeProperty = - new EdgeProperty(edgeDesc, - DataSourceType.PERSISTED, - SchedulingType.SEQUENTIAL, - new OutputDescriptor(logicalOutputClass.getName()), - new InputDescriptor(logicalInputClass.getName())); - break; - + return et2Conf.createDefaultCustomEdgeProperty(edgeDesc); case CUSTOM_SIMPLE_EDGE: - dataMovementType = DataMovementType.SCATTER_GATHER; - logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class; - logicalInputClass = ShuffledUnorderedKVInput.class; - break; - + assert partitionerClassName != null; + partitionerConf = createPartitionerConf(partitionerClassName, conf); + UnorderedPartitionedKVEdgeConfigurer et3Conf = UnorderedPartitionedKVEdgeConfigurer + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf).build(); + return et3Conf.createDefaultEdgeProperty(); case SIMPLE_EDGE: default: - dataMovementType = DataMovementType.SCATTER_GATHER; - logicalOutputClass = OnFileSortedOutput.class; - logicalInputClass = ShuffledMergedInputLegacy.class; - break; + assert partitionerClassName != null; + partitionerConf = createPartitionerConf(partitionerClassName, conf); + OrderedPartitionedKVEdgeConfigurer et4Conf = OrderedPartitionedKVEdgeConfigurer + .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + .setFromConfiguration(conf).build(); + return et4Conf.createDefaultEdgeProperty(); } + } - if (edgeProperty == null) { - edgeProperty = - new EdgeProperty(dataMovementType, - DataSourceType.PERSISTED, - SchedulingType.SEQUENTIAL, - new OutputDescriptor(logicalOutputClass.getName()), - new InputDescriptor(logicalInputClass.getName())); + /** + * Utility method to create a stripped down configuration for the MR partitioner. + * + * @param partitionerClassName + * the real MR partitioner class name + * @param baseConf + * a base configuration to extract relevant properties + * @return + */ + private Configuration createPartitionerConf(String partitionerClassName, + Configuration baseConf) { + Configuration partitionerConf = new Configuration(false); + partitionerConf.set("mapred.partitioner.class", partitionerClassName); + if (baseConf.get("mapreduce.totalorderpartitioner.path") != null) { + partitionerConf.set("mapreduce.totalorderpartitioner.path", + baseConf.get("mapreduce.totalorderpartitioner.path")); } - - return edgeProperty; + return partitionerConf; } /* @@ -423,9 +409,6 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, // create the directories FileSinkOperators need Utilities.createTmpDirs(conf, mapWork); - // Tez ask us to call this even if there's no preceding vertex - MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null); - // finally create the vertex Vertex map = null; @@ -487,7 +470,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, Map environment = new HashMap(); MRHelpers.updateEnvironmentForMRTasks(conf, environment, true); map.setTaskEnvironment(environment); - map.setJavaOpts(getContainerJavaOpts(conf)); + map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); assert mapWork.getAliasToWork().keySet().size() == 1; @@ -500,9 +483,9 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, } else { mrInput = MRHelpers.createMRInputPayload(serializedConf, null); } - map.addInput(alias, + map.addDataSource(alias, new InputDescriptor(MRInputLegacy.class.getName()). - setUserPayload(mrInput), amSplitGeneratorClass); + setUserPayload(mrInput), new InputInitializerDescriptor(amSplitGeneratorClass.getName()).setUserPayload(mrInput)); Map localResources = new HashMap(); localResources.put(getBaseName(appJarLr), appJarLr); @@ -517,7 +500,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, localResources); } - map.setTaskLocalResources(localResources); + map.setTaskLocalFiles(localResources); return map; } @@ -527,6 +510,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, private JobConf initializeVertexConf(JobConf baseConf, ReduceWork reduceWork) { JobConf conf = new JobConf(baseConf); + // Is this required ? conf.set("mapred.reducer.class", ExecReducer.class.getName()); boolean useSpeculativeExecReducers = HiveConf.getBoolVar(conf, @@ -550,9 +534,6 @@ private Vertex createVertex(JobConf conf, ReduceWork reduceWork, // create the directories FileSinkOperators need Utilities.createTmpDirs(conf, reduceWork); - // Call once here, will be updated when we find edges - MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null); - // create the vertex Vertex reducer = new Vertex(reduceWork.getName(), new ProcessorDescriptor(ReduceTezProcessor.class.getName()). @@ -565,14 +546,14 @@ private Vertex createVertex(JobConf conf, ReduceWork reduceWork, MRHelpers.updateEnvironmentForMRTasks(conf, environment, false); reducer.setTaskEnvironment(environment); - reducer.setJavaOpts(getContainerJavaOpts(conf)); + reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); Map localResources = new HashMap(); localResources.put(getBaseName(appJarLr), appJarLr); for (LocalResource lr: additionalLr) { localResources.put(getBaseName(lr), lr); } - reducer.setTaskLocalResources(localResources); + reducer.setTaskLocalFiles(localResources); return reducer; } @@ -606,15 +587,13 @@ private LocalResource createLocalResource(FileSystem remoteFs, Path file, } /** - * @param sessionConfig session configuration * @param numContainers number of containers to pre-warm * @param localResources additional resources to pre-warm with * @return prewarm context object */ - public PreWarmContext createPreWarmContext(TezSessionConfiguration sessionConfig, int numContainers, - Map localResources) throws IOException, TezException { - - Configuration conf = sessionConfig.getTezConfiguration(); + public PreWarmContext createPreWarmContext(TezConfiguration conf, + int numContainers, Map localResources) throws + IOException, TezException { ProcessorDescriptor prewarmProcDescriptor = new ProcessorDescriptor(HivePreWarmProcessor.class.getName()); prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf)); @@ -624,7 +603,6 @@ public PreWarmContext createPreWarmContext(TezSessionConfiguration sessionConfig Map combinedResources = new HashMap(); - combinedResources.putAll(sessionConfig.getSessionResources()); if (localResources != null) { combinedResources.putAll(localResources); } @@ -961,9 +939,9 @@ public Vertex createVertex(JobConf conf, BaseWork work, // final vertices need to have at least one output if (!hasChildren) { - v.addOutput("out_"+work.getName(), + v.addDataSink("out_"+work.getName(), new OutputDescriptor(MROutput.class.getName()) - .setUserPayload(MRHelpers.createUserPayloadFromConf(conf))); + .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), null); } return v; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index dc99702..75ac3bc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -44,10 +44,12 @@ import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto; import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.RootInputSpecUpdate; import org.apache.tez.runtime.api.TezRootInputInitializer; import org.apache.tez.runtime.api.TezRootInputInitializerContext; import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent; import org.apache.tez.runtime.api.events.RootInputDataInformationEvent; +import org.apache.tez.runtime.api.events.RootInputInitializerEvent; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; @@ -170,13 +172,16 @@ private InputSplitInfoMem generateGroupedSplits(TezRootInputInitializerContext c return new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, null, jobConf); } + public void handleInputInitializerEvent(List events) throws Exception { + } + private List createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) { List events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1); - RootInputConfigureVertexTasksEvent configureVertexEvent = - new RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(), - inputSplitInfo.getTaskLocationHints()); + RootInputConfigureVertexTasksEvent configureVertexEvent = new + RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(), + inputSplitInfo.getTaskLocationHints(), RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate()); events.add(configureVertexEvent); if (sendSerializedEvents) { @@ -196,4 +201,5 @@ private InputSplitInfoMem generateGroupedSplits(TezRootInputInitializerContext c } return events; } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java index 318ba8e..d95530b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java @@ -22,8 +22,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle; import org.apache.tez.runtime.common.objectregistry.ObjectRegistry; -import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory; - +import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; /** * ObjectCache. Tez implementation based on the tez object registry. @@ -32,7 +31,7 @@ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache { private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName()); - private final ObjectRegistry registry = ObjectRegistryFactory.getObjectRegistry(); + private final ObjectRegistry registry = new ObjectRegistryImpl(); @Override public void cache(String key, Object value) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 2959fcc..c9484e8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -50,10 +50,8 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.tez.client.AMConfiguration; import org.apache.tez.client.PreWarmContext; -import org.apache.tez.client.TezSession; -import org.apache.tez.client.TezSessionConfiguration; +import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; @@ -70,7 +68,7 @@ private HiveConf conf; private Path tezScratchDir; private LocalResource appJarLr; - private TezSession session; + private TezClient session; private String sessionId; private DagUtils utils; private String queueName; @@ -153,11 +151,6 @@ public void open(HiveConf conf, String[] additionalFiles) refreshLocalResourcesFromConf(conf); - // generate basic tez config - TezConfiguration tezConfig = new TezConfiguration(conf); - - tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString()); - // unless already installed on all the cluster nodes, we'll have to // localize hive-exec.jar as well. appJarLr = createJarLocalResource(utils.getExecJarPathLocal()); @@ -173,13 +166,12 @@ public void open(HiveConf conf, String[] additionalFiles) Map amEnv = new HashMap(); MRHelpers.updateEnvironmentForMRAM(conf, amEnv); - AMConfiguration amConfig = new AMConfiguration(amEnv, commonLocalResources, tezConfig, null); - - // configuration for the session - TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConfig); - // and finally we're ready to create and start the session - session = new TezSession("HIVE-" + sessionId, sessionConfig); + // generate basic tez config + TezConfiguration tezConfig = new TezConfiguration(conf); + tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString()); + session = new TezClient("HIVE-" + sessionId, tezConfig, true, + commonLocalResources, null); LOG.info("Opening new Tez Session (id: " + sessionId + ", scratch dir: " + tezScratchDir + ")"); @@ -190,7 +182,8 @@ public void open(HiveConf conf, String[] additionalFiles) int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS); LOG.info("Prewarming " + n + " containers (id: " + sessionId + ", scratch dir: " + tezScratchDir + ")"); - PreWarmContext context = utils.createPreWarmContext(sessionConfig, n, commonLocalResources); + PreWarmContext context = utils.createPreWarmContext(tezConfig, n, + commonLocalResources); try { session.preWarm(context); } catch (InterruptedException ie) { @@ -199,11 +192,16 @@ public void open(HiveConf conf, String[] additionalFiles) } } } - + try { + session.waitTillReady(); + } catch(InterruptedException ie) { + //ignore + } // In case we need to run some MR jobs, we'll run them under tez MR emulation. The session // id is used for tez to reuse the current session rather than start a new one. conf.set("mapreduce.framework.name", "yarn-tez"); - conf.set("mapreduce.tez.session.tokill-application-id", session.getApplicationId().toString()); + conf.set("mapreduce.tez.session.tokill-application-id", + session.getAppMasterApplicationId().toString()); openSessions.add(this); } @@ -280,7 +278,7 @@ public String getSessionId() { return sessionId; } - public TezSession getSession() { + public TezClient getSession() { return session; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 951e918..e1f5630 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Operator; @@ -247,16 +246,14 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, } VertexGroup group = dag.createVertexGroup(w.getName(), vertexArray); + // For a vertex group, all Outputs use the same Key-class, Val-class and partitioner. + // Pick any one source vertex to figure out the Edge configuration. + JobConf parentConf = workToConf.get(unionWorkItems.get(0)); + // now hook up the children for (BaseWork v: children) { - // need to pairwise patch up the configuration of the vertices - for (BaseWork part: unionWorkItems) { - utils.updateConfigurationForEdge(workToConf.get(part), workToVertex.get(part), - workToConf.get(v), workToVertex.get(v)); - } - // finally we can create the grouped edge - GroupInputEdge e = utils.createEdge(group, workToConf.get(v), + GroupInputEdge e = utils.createEdge(group, parentConf, workToVertex.get(v), work.getEdgeProperty(w, v)); dag.addEdge(e); @@ -279,7 +276,7 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, TezEdgeProperty edgeProp = work.getEdgeProperty(w, v); - e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeProp); + e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp); dag.addEdge(e); } } @@ -305,7 +302,8 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir, try { // ready to start execution on the cluster - dagClient = sessionState.getSession().submitDAG(dag, resourceMap); + sessionState.getSession().addAppMasterLocalResources(resourceMap); + dagClient = sessionState.getSession().submitDAG(dag); } catch (SessionNotRunning nr) { console.printInfo("Tez session was closed. Reopening..."); @@ -313,7 +311,7 @@ DAGClient submit(JobConf conf, DAG dag, Path scratchDir, TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf); console.printInfo("Session re-established."); - dagClient = sessionState.getSession().submitDAG(dag, resourceMap); + dagClient = sessionState.getSession().submitDAG(dag); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 0835bde..84c031c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -54,7 +54,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.tez.client.TezSession; +import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.EdgeProperty; @@ -75,7 +75,7 @@ ReduceWork[] rws; TezWork work; TezTask task; - TezSession session; + TezClient session; TezSessionState sessionState; JobConf conf; LocalResource appLr; @@ -102,13 +102,13 @@ public Vertex answer(InvocationOnMock invocation) throws Throwable { } }); - when(utils.createEdge(any(JobConf.class), any(Vertex.class), any(JobConf.class), + when(utils.createEdge(any(JobConf.class), any(Vertex.class), any(Vertex.class), any(TezEdgeProperty.class))).thenAnswer(new Answer() { @Override public Edge answer(InvocationOnMock invocation) throws Throwable { Object[] args = invocation.getArguments(); - return new Edge((Vertex)args[1], (Vertex)args[3], mock(EdgeProperty.class)); + return new Edge((Vertex)args[1], (Vertex)args[2], mock(EdgeProperty.class)); } }); @@ -158,10 +158,10 @@ public Edge answer(InvocationOnMock invocation) throws Throwable { appLr = mock(LocalResource.class); SessionState.start(new HiveConf()); - session = mock(TezSession.class); + session = mock(TezClient.class); sessionState = mock(TezSessionState.class); when(sessionState.getSession()).thenReturn(session); - when(session.submitDAG(any(DAG.class), any(Map.class))) + when(session.submitDAG(any(DAG.class))) .thenThrow(new SessionNotRunning("")) .thenReturn(mock(DAGClient.class)); } @@ -186,7 +186,7 @@ public void testBuildDag() throws IllegalArgumentException, IOException, Excepti for (BaseWork x: work.getChildren(w)) { boolean found = false; for (Vertex u: outs) { - if (u.getVertexName().equals(x.getName())) { + if (u.getName().equals(x.getName())) { found = true; break; } @@ -209,7 +209,7 @@ public void testSubmit() throws Exception { // validate close/reopen verify(sessionState, times(1)).open(any(HiveConf.class)); verify(sessionState, times(1)).close(eq(false)); // now uses pool after HIVE-7043 - verify(session, times(2)).submitDAG(any(DAG.class), any(Map.class)); + verify(session, times(2)).submitDAG(any(DAG.class)); } @Test