diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java index d6ca845..359a4d2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java @@ -19,29 +19,28 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.tez.dag.api.EdgeManager; -import org.apache.tez.dag.api.EdgeManagerContext; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.tez.dag.api.EdgeManagerPlugin; +import org.apache.tez.dag.api.EdgeManagerPluginContext; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputReadErrorEvent; -import com.google.common.collect.Multimap; - -public class CustomPartitionEdge extends EdgeManager { +public class CustomPartitionEdge extends EdgeManagerPlugin { private static final Log LOG = LogFactory.getLog(CustomPartitionEdge.class.getName()); CustomEdgeConfiguration conf = null; - EdgeManagerContext context = null; + final EdgeManagerPluginContext context; // used by the framework at runtime. initialize is the real initializer at runtime - public CustomPartitionEdge(EdgeManagerContext context) { + public CustomPartitionEdge(EdgeManagerPluginContext context) { super(context); this.context = context; } @@ -65,17 +64,17 @@ public int getNumDestinationConsumerTasks(int sourceTaskIndex) { // called at runtime to initialize the custom edge. @Override public void initialize() { - byte[] payload = context.getUserPayload(); + ByteBuffer payload = context.getUserPayload().getPayload(); LOG.info("Initializing the edge, payload: " + payload); if (payload == null) { throw new RuntimeException("Invalid payload"); } // De-serialization code - DataInputBuffer dib = new DataInputBuffer(); - dib.reset(payload, payload.length); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(payload); conf = new CustomEdgeConfiguration(); try { - conf.readFields(dib); + conf.readFields(dibb); } catch (IOException e) { throw new RuntimeException(e); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index 407d8ac..0d0770f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -37,24 +37,24 @@ import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat; -import org.apache.tez.dag.api.EdgeManagerDescriptor; +import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper; +import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; +import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.InputDescriptor; -import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.VertexManagerPlugin; import org.apache.tez.dag.api.VertexManagerPluginContext; -import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; -import org.apache.tez.mapreduce.hadoop.MRHelpers; +import org.apache.tez.mapreduce.hadoop.MRInputHelpers; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.RootInputSpecUpdate; -import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent; -import org.apache.tez.runtime.api.events.RootInputDataInformationEvent; -import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent; +import org.apache.tez.runtime.api.InputSpecUpdate; +import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; +import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; import com.google.common.base.Preconditions; @@ -63,6 +63,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import com.google.protobuf.ByteString; /* * Only works with old mapred API @@ -74,8 +75,8 @@ VertexManagerPluginContext context; - private RootInputConfigureVertexTasksEvent configureVertexTaskEvent; - private List dataInformationEvents; + private InputConfigureVertexTasksEvent configureVertexTaskEvent; + private List dataInformationEvents; private int numBuckets = -1; private Configuration conf = null; private boolean rootVertexInitialized = false; @@ -89,7 +90,7 @@ public CustomPartitionVertex(VertexManagerPluginContext context) { @Override public void initialize() { this.context = getContext(); - ByteBuffer byteBuf = ByteBuffer.wrap(context.getUserPayload()); + ByteBuffer byteBuf = context.getUserPayload().getPayload(); this.numBuckets = byteBuf.getInt(); } @@ -129,8 +130,8 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr // but that // means serializing another instance. MRInputUserPayloadProto protoPayload = - MRHelpers.parseMRInputPayload(inputDescriptor.getUserPayload()); - this.conf = MRHelpers.createConfFromByteString(protoPayload.getConfigurationBytes()); + MRInputHelpers.parseMRInputPayload(inputDescriptor.getUserPayload()); + this.conf = TezUtils.createConfFromByteString(protoPayload.getConfigurationBytes()); /* * Currently in tez, the flow of events is thus: @@ -146,13 +147,10 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr */ // This assumes that Grouping will always be used. - // Changing the InputFormat - so that the correct one is initialized in - // MRInput. - this.conf.set("mapred.input.format.class", TezGroupedSplitsInputFormat.class.getName()); + // Enabling grouping on the payload. MRInputUserPayloadProto updatedPayload = - MRInputUserPayloadProto.newBuilder(protoPayload) - .setConfigurationBytes(MRHelpers.createByteStringFromConf(conf)).build(); - inputDescriptor.setUserPayload(updatedPayload.toByteArray()); + MRInputUserPayloadProto.newBuilder(protoPayload).setGroupingEnabled(true).build(); + inputDescriptor.setUserPayload(UserPayload.create(updatedPayload.toByteString().asReadOnlyByteBuffer())); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e); @@ -162,14 +160,14 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr Map> pathFileSplitsMap = new TreeMap>(); for (Event event : events) { - if (event instanceof RootInputConfigureVertexTasksEvent) { + if (event instanceof InputConfigureVertexTasksEvent) { // No tasks should have been started yet. Checked by initial state // check. Preconditions.checkState(dataInformationEventSeen == false); Preconditions .checkState(context.getVertexNumTasks(context.getVertexName()) == -1, "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism"); - RootInputConfigureVertexTasksEvent cEvent = (RootInputConfigureVertexTasksEvent) event; + InputConfigureVertexTasksEvent cEvent = (InputConfigureVertexTasksEvent) event; // The vertex cannot be configured until all DataEvents are seen - to // build the routing table. @@ -177,12 +175,12 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr dataInformationEvents = Lists.newArrayListWithCapacity(configureVertexTaskEvent.getNumTasks()); } - if (event instanceof RootInputUpdatePayloadEvent) { + if (event instanceof InputUpdatePayloadEvent) { // this event can never occur. If it does, fail. Preconditions.checkState(false); - } else if (event instanceof RootInputDataInformationEvent) { + } else if (event instanceof InputDataInformationEvent) { dataInformationEventSeen = true; - RootInputDataInformationEvent diEvent = (RootInputDataInformationEvent) event; + InputDataInformationEvent diEvent = (InputDataInformationEvent) event; dataInformationEvents.add(diEvent); FileSplit fileSplit; try { @@ -206,8 +204,8 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr int totalResource = context.getTotalAvailableResource().getMemory(); int taskResource = context.getVertexTaskResource().getMemory(); float waves = - conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES, - TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT); + conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES, + TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT); int availableSlots = totalResource / taskResource; @@ -250,12 +248,12 @@ private void processAllEvents(String inputName, // Construct the EdgeManager descriptor to be used by all edges which need // the routing table. - EdgeManagerDescriptor hiveEdgeManagerDesc = - new EdgeManagerDescriptor(CustomPartitionEdge.class.getName()); - byte[] payload = getBytePayload(bucketToTaskMap); + EdgeManagerPluginDescriptor hiveEdgeManagerDesc = + EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); + UserPayload payload = getBytePayload(bucketToTaskMap); hiveEdgeManagerDesc.setUserPayload(payload); - Map emMap = Maps.newHashMap(); + Map emMap = Maps.newHashMap(); // Replace the edge manager for all vertices which have routing type custom. for (Entry edgeEntry : context.getInputVertexEdgeProperties().entrySet()) { @@ -268,52 +266,51 @@ private void processAllEvents(String inputName, LOG.info("Task count is " + taskCount); - List taskEvents = + List taskEvents = Lists.newArrayListWithCapacity(finalSplits.size()); // Re-serialize the splits after grouping. int count = 0; for (InputSplit inputSplit : finalSplits) { - MRSplitProto serializedSplit = MRHelpers.createSplitProto(inputSplit); - RootInputDataInformationEvent diEvent = - new RootInputDataInformationEvent(count, serializedSplit.toByteArray()); + MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(inputSplit); + InputDataInformationEvent diEvent = + InputDataInformationEvent.create(count, serializedSplit.toByteString().asReadOnlyByteBuffer()); diEvent.setTargetIndex(count); count++; taskEvents.add(diEvent); } // Replace the Edge Managers - Map rootInputSpecUpdate = - new HashMap(); + Map rootInputSpecUpdate = + new HashMap(); rootInputSpecUpdate.put( inputName, - RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate()); + InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate()); context.setVertexParallelism( taskCount, - new VertexLocationHint(grouper.createTaskLocationHints(finalSplits + VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate); // Set the actual events for the tasks. context.addRootInputEvents(inputName, taskEvents); } - private byte[] getBytePayload(Multimap routingTable) throws IOException { + UserPayload getBytePayload(Multimap routingTable) throws IOException { CustomEdgeConfiguration edgeConf = new CustomEdgeConfiguration(routingTable.keySet().size(), routingTable); DataOutputBuffer dob = new DataOutputBuffer(); edgeConf.write(dob); byte[] serialized = dob.getData(); - - return serialized; + return UserPayload.create(ByteBuffer.wrap(serialized)); } - private FileSplit getFileSplitFromEvent(RootInputDataInformationEvent event) throws IOException { + private FileSplit getFileSplitFromEvent(InputDataInformationEvent event) throws IOException { InputSplit inputSplit = null; if (event.getDeserializedUserPayload() != null) { inputSplit = (InputSplit) event.getDeserializedUserPayload(); } else { - MRSplitProto splitProto = MRSplitProto.parseFrom(event.getUserPayload()); + MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(event.getUserPayload())); SerializationFactory serializationFactory = new SerializationFactory(new Configuration()); - inputSplit = MRHelpers.createOldFormatSplitFromUserPayload(splitProto, serializationFactory); + inputSplit = MRInputHelpers.createOldFormatSplitFromUserPayload(splitProto, serializationFactory); } if (!(inputSplit instanceof FileSplit)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index d65dc26..90ce01e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -70,7 +70,6 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; -import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -79,43 +78,41 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; -import org.apache.tez.client.PreWarmVertex; +import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.DataSinkDescriptor; import org.apache.tez.dag.api.DataSourceDescriptor; import org.apache.tez.dag.api.Edge; -import org.apache.tez.dag.api.EdgeManagerDescriptor; +import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.dag.api.PreWarmVertex; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; -import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; -import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator; -import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; +import org.apache.tez.mapreduce.hadoop.MRInputHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.partition.MRPartitioner; -import org.apache.tez.runtime.api.TezRootInputInitializer; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; -import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer; -import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfigurer; -import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer; +import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; +import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig; +import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig; import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput; import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; /** * DagUtils. DagUtils is a collection of helper methods to convert @@ -236,9 +233,10 @@ public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, mergeInputClass = ConcatenatedMergedKeyValueInput.class; int numBuckets = edgeProp.getNumBuckets(); VertexManagerPluginDescriptor desc = - new VertexManagerPluginDescriptor(CustomPartitionVertex.class.getName()); - byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array(); - desc.setUserPayload(userPayload); + VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName()); + ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets); + userPayload.flip(); + desc.setUserPayload(UserPayload.create(userPayload)); w.setVertexManagerPlugin(desc); break; } @@ -256,8 +254,8 @@ public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, break; } - return new GroupInputEdge(group, w, createEdgeProperty(edgeProp, vConf), - new InputDescriptor(mergeInputClass.getName())); + return GroupInputEdge.create(group, w, createEdgeProperty(edgeProp, vConf), + InputDescriptor.create(mergeInputClass.getName())); } /** @@ -276,10 +274,11 @@ public Edge createEdge(JobConf vConf, Vertex v, Vertex w, switch(edgeProp.getEdgeType()) { case CUSTOM_EDGE: { int numBuckets = edgeProp.getNumBuckets(); - byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array(); - VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor( + ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets); + userPayload.flip(); + VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create( CustomPartitionVertex.class.getName()); - desc.setUserPayload(userPayload); + desc.setUserPayload(UserPayload.create(userPayload)); w.setVertexManagerPlugin(desc); break; } @@ -291,7 +290,7 @@ public Edge createEdge(JobConf vConf, Vertex v, Vertex w, // nothing } - return new Edge(v, w, createEdgeProperty(edgeProp, vConf)); + return Edge.create(v, w, createEdgeProperty(edgeProp, vConf)); } /* @@ -300,7 +299,7 @@ public Edge createEdge(JobConf vConf, Vertex v, Vertex w, @SuppressWarnings("rawtypes") private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf) throws IOException { - MRHelpers.translateVertexConfToTez(conf); + MRHelpers.translateMRConfToTez(conf); String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS); String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS); String partitionerClassName = conf.get("mapred.partitioner.class"); @@ -309,28 +308,28 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration EdgeType edgeType = edgeProp.getEdgeType(); switch (edgeType) { case BROADCAST_EDGE: - UnorderedUnpartitionedKVEdgeConfigurer et1Conf = UnorderedUnpartitionedKVEdgeConfigurer + UnorderedKVEdgeConfig et1Conf = UnorderedKVEdgeConfig .newBuilder(keyClass, valClass).setFromConfiguration(conf).build(); return et1Conf.createDefaultBroadcastEdgeProperty(); case CUSTOM_EDGE: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); - UnorderedPartitionedKVEdgeConfigurer et2Conf = UnorderedPartitionedKVEdgeConfigurer + UnorderedPartitionedKVEdgeConfig et2Conf = UnorderedPartitionedKVEdgeConfig .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) .setFromConfiguration(conf).build(); - EdgeManagerDescriptor edgeDesc = - new EdgeManagerDescriptor(CustomPartitionEdge.class.getName()); + EdgeManagerPluginDescriptor edgeDesc = + EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName()); CustomEdgeConfiguration edgeConf = new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null); DataOutputBuffer dob = new DataOutputBuffer(); edgeConf.write(dob); byte[] userPayload = dob.getData(); - edgeDesc.setUserPayload(userPayload); + edgeDesc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload))); return et2Conf.createDefaultCustomEdgeProperty(edgeDesc); case CUSTOM_SIMPLE_EDGE: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); - UnorderedPartitionedKVEdgeConfigurer et3Conf = UnorderedPartitionedKVEdgeConfigurer + UnorderedPartitionedKVEdgeConfig et3Conf = UnorderedPartitionedKVEdgeConfig .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) .setFromConfiguration(conf).build(); return et3Conf.createDefaultEdgeProperty(); @@ -338,7 +337,7 @@ private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration default: assert partitionerClassName != null; partitionerConf = createPartitionerConf(partitionerClassName, conf); - OrderedPartitionedKVEdgeConfigurer et4Conf = OrderedPartitionedKVEdgeConfigurer + OrderedPartitionedKVEdgeConfig et4Conf = OrderedPartitionedKVEdgeConfig .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) .setFromConfiguration(conf).build(); return et4Conf.createDefaultEdgeProperty(); @@ -384,7 +383,7 @@ private Resource getContainerResource(Configuration conf) { */ private Map getContainerEnvironment(Configuration conf, boolean isMap) { Map environment = new HashMap(); - MRHelpers.updateEnvironmentForMRTasks(conf, environment, isMap); + MRHelpers.updateEnvBasedOnMRTaskEnv(conf, environment, isMap); return environment; } @@ -398,14 +397,14 @@ private String getContainerJavaOpts(Configuration conf) { if (javaOpts != null && !javaOpts.isEmpty()) { String logLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZLOGLEVEL); List logProps = Lists.newArrayList(); - MRHelpers.addLog4jSystemProperties(logLevel, logProps); + TezUtils.addLog4jSystemProperties(logLevel, logProps); StringBuilder sb = new StringBuilder(); for (String str : logProps) { sb.append(str).append(" "); } return javaOpts + " " + sb.toString(); } - return MRHelpers.getMapJavaOpts(conf); + return MRHelpers.getJavaOptsForMRMapper(conf); } /* @@ -427,11 +426,11 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, Vertex map = null; // use tez to combine splits - boolean useTezGroupedSplits = false; + boolean groupSplitsInInputInitializer; + + DataSourceDescriptor dataSource; int numTasks = -1; - Class amSplitGeneratorClass = null; - InputSplitInfo inputSplitInfo = null; Class inputFormatClass = conf.getClass("mapred.input.format.class", InputFormat.class); @@ -445,9 +444,9 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, } if (vertexHasCustomInput) { - useTezGroupedSplits = false; - // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat - // here would cause pre-mature grouping which would be incorrect. + groupSplitsInInputInitializer = false; + // grouping happens in execution phase. The input payload should not enable grouping here, + // it will be enabled in the CustomVertex. inputFormatClass = HiveInputFormat.class; conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class); // mapreduce.tez.input.initializer.serialize.event.payload should be set to false when using @@ -457,50 +456,48 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, // we'll set up tez to combine spits for us iff the input format // is HiveInputFormat if (inputFormatClass == HiveInputFormat.class) { - useTezGroupedSplits = true; - conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class); + groupSplitsInInputInitializer = true; + } else { + groupSplitsInInputInitializer = false; } } + // set up the operator plan. Before setting up Inputs since the config is updated. + Utilities.setMapWork(conf, mapWork, mrScratchDir, false); + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION) && !mapWork.isUseOneNullRowInputFormat()) { // if we're generating the splits in the AM, we just need to set // the correct plugin. - if (useTezGroupedSplits) { - amSplitGeneratorClass = HiveSplitGenerator.class; + if (groupSplitsInInputInitializer) { + // Not setting a payload, since the MRInput payload is the same and can be accessed. + InputInitializerDescriptor descriptor = InputInitializerDescriptor.create( + HiveSplitGenerator.class.getName()); + dataSource = MRInputLegacy.createConfigBuilder(conf, inputFormatClass).groupSplits(true) + .setCustomInitializerDescriptor(descriptor).build(); } else { - amSplitGeneratorClass = MRInputAMSplitGenerator.class; + // Not HiveInputFormat, or a custom VertexManager will take care of grouping splits + dataSource = MRInputLegacy.createConfigBuilder(conf, inputFormatClass).groupSplits(false).build(); } } else { - // client side split generation means we have to compute them now - inputSplitInfo = MRHelpers.generateInputSplits(conf, - new Path(tezDir, "split_"+mapWork.getName().replaceAll(" ", "_"))); - numTasks = inputSplitInfo.getNumTasks(); + // Setup client side split generation. + dataSource = MRInputHelpers.configureMRInputWithLegacySplitGeneration(conf, new Path(tezDir, + "split_" + mapWork.getName().replaceAll(" ", "_")), true); + numTasks = dataSource.getNumberOfShards(); } - // set up the operator plan - Utilities.setMapWork(conf, mapWork, mrScratchDir, false); - - byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf); - map = new Vertex(mapWork.getName(), - new ProcessorDescriptor(MapTezProcessor.class.getName()). + UserPayload serializedConf = TezUtils.createUserPayloadFromConf(conf); + map = Vertex.create(mapWork.getName(), + ProcessorDescriptor.create(MapTezProcessor.class.getName()). setUserPayload(serializedConf), numTasks, getContainerResource(conf)); map.setTaskEnvironment(getContainerEnvironment(conf, true)); map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf)); assert mapWork.getAliasToWork().keySet().size() == 1; + // Add the actual source input String alias = mapWork.getAliasToWork().keySet().iterator().next(); - - byte[] mrInput = null; - if (useTezGroupedSplits) { - mrInput = MRHelpers.createMRInputPayloadWithGrouping(serializedConf); - } else { - mrInput = MRHelpers.createMRInputPayload(serializedConf); - } - map.addDataSource(alias, - new DataSourceDescriptor(new InputDescriptor(MRInputLegacy.class.getName()). - setUserPayload(mrInput), new InputInitializerDescriptor(amSplitGeneratorClass.getName()).setUserPayload(mrInput),null)); + map.addDataSource(alias, dataSource); Map localResources = new HashMap(); localResources.put(getBaseName(appJarLr), appJarLr); @@ -508,13 +505,6 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, localResources.put(getBaseName(lr), lr); } - if (inputSplitInfo != null) { - // only relevant for client-side split generation - map.setLocationHint(new VertexLocationHint(inputSplitInfo.getTaskLocationHints())); - MRHelpers.updateLocalResourcesForInputSplits(FileSystem.get(conf), inputSplitInfo, - localResources); - } - map.setTaskLocalFiles(localResources); return map; } @@ -550,9 +540,9 @@ private Vertex createVertex(JobConf conf, ReduceWork reduceWork, Utilities.createTmpDirs(conf, reduceWork); // create the vertex - Vertex reducer = new Vertex(reduceWork.getName(), - new ProcessorDescriptor(ReduceTezProcessor.class.getName()). - setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), + Vertex reducer = Vertex.create(reduceWork.getName(), + ProcessorDescriptor.create(ReduceTezProcessor.class.getName()). + setUserPayload(TezUtils.createUserPayloadFromConf(conf)), reduceWork.isAutoReduceParallelism() ? reduceWork.getMaxReduceTasks() : reduceWork .getNumReduceTasks(), getContainerResource(conf)); @@ -606,10 +596,10 @@ public PreWarmVertex createPreWarmVertex(TezConfiguration conf, int numContainers, Map localResources) throws IOException, TezException { - ProcessorDescriptor prewarmProcDescriptor = new ProcessorDescriptor(HivePreWarmProcessor.class.getName()); - prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf)); + ProcessorDescriptor prewarmProcDescriptor = ProcessorDescriptor.create(HivePreWarmProcessor.class.getName()); + prewarmProcDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf)); - PreWarmVertex prewarmVertex = new PreWarmVertex("prewarm", prewarmProcDescriptor, numContainers,getContainerResource(conf)); + PreWarmVertex prewarmVertex = PreWarmVertex.create("prewarm", prewarmProcDescriptor, numContainers,getContainerResource(conf)); Map combinedResources = new HashMap(); @@ -855,7 +845,7 @@ public LocalResource localizeResource(Path src, Path dest, Configuration conf) public JobConf createConfiguration(HiveConf hiveConf) throws IOException { hiveConf.setBoolean("mapred.mapper.new-api", false); - JobConf conf = (JobConf) MRHelpers.getBaseMRConfiguration(hiveConf); + JobConf conf = new JobConf(hiveConf); conf.set("mapred.output.committer.class", NullOutputCommitter.class.getName()); @@ -946,8 +936,8 @@ public Vertex createVertex(JobConf conf, BaseWork work, // final vertices need to have at least one output if (!hasChildren) { v.addDataSink("out_"+work.getName(), new DataSinkDescriptor( - new OutputDescriptor(MROutput.class.getName()) - .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), null, null)); + OutputDescriptor.create(MROutput.class.getName()) + .setUserPayload(TezUtils.createUserPayloadFromConf(conf)), null, null)); } return v; @@ -1015,16 +1005,16 @@ private void setupAutoReducerParallelism(TezEdgeProperty edgeProp, Vertex v) if (edgeProp.isAutoReduce()) { Configuration pluginConf = new Configuration(false); VertexManagerPluginDescriptor desc = - new VertexManagerPluginDescriptor(ShuffleVertexManager.class.getName()); + VertexManagerPluginDescriptor.create(ShuffleVertexManager.class.getName()); pluginConf.setBoolean( - ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true); - pluginConf.setInt(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM, + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true); + pluginConf.setInt(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM, edgeProp.getMinReducer()); pluginConf.setLong( - ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, edgeProp.getInputSizePerReducer()); - ByteString payload = MRHelpers.createByteStringFromConf(pluginConf); - desc.setUserPayload(payload.toByteArray()); + UserPayload payload = TezUtils.createUserPayloadFromConf(pluginConf); + desc.setUserPayload(payload); v.setVertexManagerPlugin(desc); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java index 7a42b93..ce3b1d6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java @@ -25,16 +25,15 @@ import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.LogicalIOProcessor; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; -import org.apache.tez.runtime.api.TezProcessorContext; +import org.apache.tez.runtime.api.ProcessorContext; import java.net.URL; import java.net.JarURLConnection; -import java.util.ArrayList; import java.util.Enumeration; import java.util.List; import java.util.Map; @@ -57,14 +56,13 @@ private Configuration conf; - public HivePreWarmProcessor(TezProcessorContext context) { + public HivePreWarmProcessor(ProcessorContext context) { super(context); } @Override public void initialize() throws Exception { - TezProcessorContext processorContext = getContext(); - byte[] userPayload = processorContext.getUserPayload(); + UserPayload userPayload = getContext().getUserPayload(); this.conf = TezUtils.createConfFromUserPayload(userPayload); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index ebd2c8f..f9fbf22 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -35,22 +35,23 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem; -import org.apache.tez.mapreduce.hadoop.MRHelpers; +import org.apache.tez.mapreduce.hadoop.MRInputHelpers; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto; import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.RootInputSpecUpdate; -import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent; -import org.apache.tez.runtime.api.events.RootInputDataInformationEvent; -import org.apache.tez.runtime.api.events.RootInputInitializerEvent; -import org.apache.tez.runtime.api.RootInputSpecUpdate; -import org.apache.tez.runtime.api.TezRootInputInitializer; -import org.apache.tez.runtime.api.TezRootInputInitializerContext; +import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; +import org.apache.tez.runtime.api.events.InputInitializerEvent; +import org.apache.tez.runtime.api.InputInitializer; +import org.apache.tez.runtime.api.InputInitializerContext; +import org.apache.tez.runtime.api.InputSpecUpdate; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; @@ -62,25 +63,25 @@ * making sure that splits from different partitions are only grouped if they * are of the same schema, format and serde */ -public class HiveSplitGenerator extends TezRootInputInitializer { +public class HiveSplitGenerator extends InputInitializer { private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class); private static final SplitGrouper grouper = new SplitGrouper(); - public HiveSplitGenerator(TezRootInputInitializerContext initializerContext) { + public HiveSplitGenerator(InputInitializerContext initializerContext) { super(initializerContext); } @Override public List initialize() throws Exception { - TezRootInputInitializerContext rootInputContext = getContext(); + InputInitializerContext rootInputContext = getContext(); MRInputUserPayloadProto userPayloadProto = - MRHelpers.parseMRInputPayload(rootInputContext.getUserPayload()); + MRInputHelpers.parseMRInputPayload(rootInputContext.getInputUserPayload()); Configuration conf = - MRHelpers.createConfFromByteString(userPayloadProto.getConfigurationBytes()); + TezUtils.createConfFromByteString(userPayloadProto.getConfigurationBytes()); boolean sendSerializedEvents = conf.getBoolean("mapreduce.tez.input.initializer.serialize.event.payload", true); @@ -91,7 +92,8 @@ public HiveSplitGenerator(TezRootInputInitializerContext initializerContext) { InputSplitInfoMem inputSplitInfo = null; String realInputFormatName = conf.get("mapred.input.format.class"); - if (realInputFormatName != null && !realInputFormatName.isEmpty()) { + boolean groupingEnabled = userPayloadProto.getGroupingEnabled(); + if (groupingEnabled) { // Need to instantiate the realInputFormat InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(Class.forName(realInputFormatName), @@ -103,8 +105,8 @@ public HiveSplitGenerator(TezRootInputInitializerContext initializerContext) { // Create the un-grouped splits float waves = - conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES, - TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT); + conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES, + TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT); InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves)); LOG.info("Number of input splits: " + splits.length + ". " + availableSlots @@ -124,7 +126,13 @@ public HiveSplitGenerator(TezRootInputInitializerContext initializerContext) { new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, null, jobConf); } else { // no need for grouping and the target #of tasks. - inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf, false, 0); + // This code path should never be triggered at the moment. If grouping is disabled, + // DAGUtils uses MRInputAMSplitGenerator. + // If this is used in the future - make sure to disable grouping in the payload, if it isn't already disabled + throw new RuntimeException( + "HiveInputFormat does not support non-grouped splits, InputFormatName is: " + + realInputFormatName); + // inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, false, 0); } return createEventList(sendSerializedEvents, inputSplitInfo); @@ -181,31 +189,32 @@ public HiveSplitGenerator(TezRootInputInitializerContext initializerContext) { return groupedSplits; } - public void handleInputInitializerEvent(List events) throws Exception { + @Override + public void handleInputInitializerEvent(List events) throws Exception { } private List createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) { List events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1); - RootInputConfigureVertexTasksEvent configureVertexEvent = new - RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(), - inputSplitInfo.getTaskLocationHints(), - RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate()); + InputConfigureVertexTasksEvent configureVertexEvent = + InputConfigureVertexTasksEvent.create(inputSplitInfo.getNumTasks(), + VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()), + InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate()); events.add(configureVertexEvent); if (sendSerializedEvents) { MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto(); int count = 0; for (MRSplitProto mrSplit : splitsProto.getSplitsList()) { - RootInputDataInformationEvent diEvent = - new RootInputDataInformationEvent(count++, mrSplit.toByteArray()); + InputDataInformationEvent diEvent = + InputDataInformationEvent.create(count++, mrSplit.toByteString().asReadOnlyByteBuffer()); events.add(diEvent); } } else { int count = 0; for (org.apache.hadoop.mapred.InputSplit split : inputSplitInfo.getOldFormatSplits()) { - RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(count++, split); + InputDataInformationEvent diEvent = InputDataInformationEvent.create(count++, split); events.add(diEvent); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index d964eb1..9e57259 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -47,7 +47,7 @@ import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; -import org.apache.tez.runtime.api.TezProcessorContext; +import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.api.KeyValueReader; /** @@ -65,7 +65,7 @@ private MapWork mapWork; @Override - void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter, + void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter, Map inputs, Map outputs) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); super.init(jconf, processorContext, mrReporter, inputs, outputs); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java index e388a0d..01d716d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java @@ -17,14 +17,14 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import org.apache.tez.runtime.api.TezProcessorContext; +import org.apache.tez.runtime.api.ProcessorContext; /** * Subclass that is used to indicate if this is a map or reduce process */ public class MapTezProcessor extends TezProcessor { - public MapTezProcessor(TezProcessorContext context) { + public MapTezProcessor(ProcessorContext context) { super(context); this.isMap = true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java index 9ebe406..c2462d7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java @@ -20,7 +20,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tez.runtime.common.objectregistry.ObjectRegistry; +import org.apache.tez.runtime.api.ObjectRegistry; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; /** @@ -30,6 +30,7 @@ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache { private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName()); + // TODO HIVE-7809. This is broken. A new instance of ObjectRegistry should not be created. private final ObjectRegistry registry = new ObjectRegistryImpl(); @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java index 1577827..994721f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -32,7 +32,7 @@ import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; -import org.apache.tez.runtime.api.TezProcessorContext; +import org.apache.tez.runtime.api.ProcessorContext; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; @@ -47,7 +47,7 @@ protected Map inputs; protected Map outputs; protected Map outMap; - protected TezProcessorContext processorContext; + protected ProcessorContext processorContext; public static final Log l4j = LogFactory.getLog(RecordProcessor.class); @@ -72,7 +72,7 @@ * @param outputs map of Output names to {@link LogicalOutput}s * @throws Exception */ - void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter, + void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter, Map inputs, Map outputs) throws Exception { this.jconf = jconf; this.reporter = mrReporter; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index e884afd..0a9cafa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -59,14 +58,13 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; -import org.apache.tez.runtime.api.TezProcessorContext; +import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.api.KeyValuesReader; /** @@ -113,7 +111,7 @@ private List[] valueStringWriters; @Override - void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter, + void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter, Map inputs, Map outputs) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); super.init(jconf, processorContext, mrReporter, inputs, outputs); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java index c79444e..256aa7e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java @@ -17,14 +17,14 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import org.apache.tez.runtime.api.TezProcessorContext; +import org.apache.tez.runtime.api.ProcessorContext; /** * Subclass that is used to indicate if this is a map or reduce process */ public class ReduceTezProcessor extends TezProcessor { - public ReduceTezProcessor(TezProcessorContext context) { + public ReduceTezProcessor(ProcessorContext context) { super(context); this.isMap = false; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index c315dd2..54ac686 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -142,7 +142,7 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, if (!running) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); console.printInfo("Status: Running (application id: " - +dagClient.getApplicationId()+")\n"); + +dagClient.getExecutionContext()+")\n"); for (String s: progressMap.keySet()) { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index ea3770d..d542792 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; import java.text.NumberFormat; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -35,10 +34,9 @@ import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.LogicalIOProcessor; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; -import org.apache.tez.runtime.api.TezProcessorContext; +import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.api.KeyValueWriter; /** @@ -59,8 +57,6 @@ private static final String CLASS_NAME = TezProcessor.class.getName(); private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); - private TezProcessorContext processorContext; - protected static final NumberFormat taskIdFormat = NumberFormat.getInstance(); protected static final NumberFormat jobIdFormat = NumberFormat.getInstance(); static { @@ -70,7 +66,7 @@ jobIdFormat.setMinimumIntegerDigits(4); } - public TezProcessor(TezProcessorContext context) { + public TezProcessor(ProcessorContext context) { super(context); } @@ -88,18 +84,14 @@ public void handleEvents(List arg0) { @Override public void initialize() throws IOException { - TezProcessorContext processorContext = getContext(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); - this.processorContext = processorContext; - //get the jobconf - byte[] userPayload = processorContext.getUserPayload(); - Configuration conf = TezUtils.createConfFromUserPayload(userPayload); + Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); this.jobConf = new JobConf(conf); - setupMRLegacyConfigs(processorContext); + setupMRLegacyConfigs(getContext()); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR); } - private void setupMRLegacyConfigs(TezProcessorContext processorContext) { + private void setupMRLegacyConfigs(ProcessorContext processorContext) { // Hive "insert overwrite local directory" uses task id as dir name // Setting the id in jobconf helps to have the similar dir name as MR StringBuilder taskAttemptIdBuilder = new StringBuilder("task"); @@ -134,7 +126,7 @@ public void run(Map inputs, Map out // in case of broadcast-join read the broadcast edge inputs // (possibly asynchronously) - LOG.info("Running task: " + processorContext.getUniqueIdentifier()); + LOG.info("Running task: " + getContext().getUniqueIdentifier()); if (isMap) { rproc = new MapRecordProcessor(); @@ -161,8 +153,8 @@ public void run(Map inputs, Map out // Outputs will be started later by the individual Processors. - MRTaskReporter mrReporter = new MRTaskReporter(processorContext); - rproc.init(jobConf, processorContext, mrReporter, inputs, outputs); + MRTaskReporter mrReporter = new MRTaskReporter(getContext()); + rproc.init(jobConf, getContext(), mrReporter, inputs, outputs); rproc.run(); //done - output does not need to be committed as hive does not use outputcommitter diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 19c8060..c7b2f64 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -41,21 +41,17 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.client.TezClient; -import org.apache.tez.client.PreWarmVertex; +import org.apache.tez.dag.api.PreWarmVertex; import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; -import org.apache.tez.dag.api.Vertex; import org.apache.tez.mapreduce.hadoop.MRHelpers; /** @@ -165,7 +161,7 @@ public void open(HiveConf conf, String[] additionalFiles) // Create environment for AM. Map amEnv = new HashMap(); - MRHelpers.updateEnvironmentForMRAM(conf, amEnv); + MRHelpers.updateEnvBasedOnMRAMEnv(conf, amEnv); // and finally we're ready to create and start the session // generate basic tez config @@ -180,7 +176,7 @@ public void open(HiveConf conf, String[] additionalFiles) tezConfig.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, n); } - session = new TezClient("HIVE-" + sessionId, tezConfig, true, + session = TezClient.create("HIVE-" + sessionId, tezConfig, true, commonLocalResources, null); LOG.info("Opening new Tez Session (id: " + sessionId @@ -196,9 +192,13 @@ public void open(HiveConf conf, String[] additionalFiles) commonLocalResources); try { session.preWarm(prewarmVertex); - } catch (InterruptedException ie) { - if (LOG.isDebugEnabled()) { - LOG.debug("Hive Prewarm threw an exception ", ie); + } catch (IOException ie) { + if (ie.getMessage().contains("Interrupted while waiting")) { + if (LOG.isDebugEnabled()) { + LOG.debug("Hive Prewarm threw an exception ", ie); + } + } else { + throw ie; } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java index d9139b8..9801a0d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java @@ -22,9 +22,9 @@ import java.util.Map; import org.apache.tez.runtime.api.Input; +import org.apache.tez.runtime.api.MergedInputContext; import org.apache.tez.runtime.api.MergedLogicalInput; import org.apache.tez.runtime.api.Reader; -import org.apache.tez.runtime.api.TezMergedInputContext; /** * TezMergedLogicalInput is an adapter to make union input look like @@ -34,7 +34,7 @@ private Map readyInputs = new IdentityHashMap(); - public TezMergedLogicalInput(TezMergedInputContext context, List inputs) { + public TezMergedLogicalInput(MergedInputContext context, List inputs) { super(context, inputs); } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java index 37b1669..5caab45 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java @@ -14,7 +14,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.conf.HiveConf;; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 84c031c..f775ef2 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -33,7 +33,6 @@ import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -97,7 +96,7 @@ public void setUp() throws Exception { @Override public Vertex answer(InvocationOnMock invocation) throws Throwable { Object[] args = invocation.getArguments(); - return new Vertex(((BaseWork)args[1]).getName(), + return Vertex.create(((BaseWork)args[1]).getName(), mock(ProcessorDescriptor.class), 0, mock(Resource.class)); } }); @@ -108,7 +107,7 @@ public Vertex answer(InvocationOnMock invocation) throws Throwable { @Override public Edge answer(InvocationOnMock invocation) throws Throwable { Object[] args = invocation.getArguments(); - return new Edge((Vertex)args[1], (Vertex)args[2], mock(EdgeProperty.class)); + return Edge.create((Vertex)args[1], (Vertex)args[2], mock(EdgeProperty.class)); } });