diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 6e9c4cd..84154a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -77,8 +77,6 @@ import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.output.MROutput; -import org.apache.tez.mapreduce.processor.map.MapProcessor; -import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; import org.apache.tez.mapreduce.partition.MRPartitioner; /** @@ -232,7 +230,7 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf); if (inputSplitInfo.getNumTasks() != 0) { map = new Vertex(mapWork.getName(), - new ProcessorDescriptor(TezProcessor.class.getName()). + new ProcessorDescriptor(MapTezProcessor.class.getName()). setUserPayload(serializedConf), inputSplitInfo.getNumTasks(), MRHelpers.getMapResource(conf)); Map environment = new HashMap(); @@ -312,7 +310,7 @@ private static Vertex createVertex(JobConf conf, ReduceWork reduceWork, // create the vertex Vertex reducer = new Vertex(reduceWork.getName(), - new ProcessorDescriptor(ReduceProcessor.class.getName()). + new ProcessorDescriptor(ReduceTezProcessor.class.getName()). setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), reduceWork.getNumReduceTasks(), MRHelpers.getReduceResource(conf)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 92de4a6..70cf5b3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -18,10 +18,12 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; -import java.util.Collection; +import java.util.Map; +import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.ObjectCache; @@ -45,26 +47,36 @@ */ public class MapRecordProcessor extends RecordProcessor{ - private static final String PLAN_KEY = "__MAP_PLAN__"; + private MapOperator mapOp; public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class); private final ExecMapperContext execContext = new ExecMapperContext(); private boolean abort = false; + protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; @Override - void init(JobConf jconf, MRTaskReporter mrReporter, Collection inputs, + void init(JobConf jconf, MRTaskReporter mrReporter, Map inputs, OutputCollector out){ super.init(jconf, mrReporter, inputs, out); + //Update JobConf using MRInput, info like filename comes via this + MRInput mrInput = getMRInput(inputs); + Configuration updatedConf = mrInput.getConfigUpdates(); + if (updatedConf != null) { + for (Entry entry : updatedConf) { + jconf.set(entry.getKey(), entry.getValue()); + } + } + ObjectCache cache = ObjectCacheFactory.getCache(jconf); try { execContext.setJc(jconf); // create map and fetch operators - MapWork mrwork = (MapWork) cache.retrieve(PLAN_KEY); + MapWork mrwork = (MapWork) cache.retrieve(MAP_PLAN_KEY); if (mrwork == null) { mrwork = Utilities.getMapWork(jconf); - cache.cache(PLAN_KEY, mrwork); + cache.cache(MAP_PLAN_KEY, mrwork); } mapOp = new MapOperator(); @@ -94,6 +106,21 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Collection inp } } + private MRInput getMRInput(Map inputs) { + //there should be only one MRInput + MRInput theMRInput = null; + for(LogicalInput inp : inputs.values()){ + if(inp instanceof MRInput){ + if(theMRInput != null){ + throw new IllegalArgumentException("Only one MRInput is expected"); + } + //a better logic would be to find the alias + theMRInput = (MRInput)inp; + } + } + return theMRInput; + } + @Override void run() throws IOException{ if (inputs.size() != 1) { @@ -101,7 +128,8 @@ void run() throws IOException{ + ", inputCount=" + inputs.size()); } - MRInput in = (MRInput)inputs.iterator().next(); + //change this for broadcast join + MRInput in = getMRInput(inputs); KeyValueReader reader = in.getReader(); //process records until done diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java new file mode 100644 index 0000000..31f3bcd --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +/** + * Subclass that is used to indicate if this is a map or reduce process + */ +public class MapTezProcessor extends TezProcessor { + public MapTezProcessor(){ + super(true); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java index fff8314..70be9dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -21,7 +21,7 @@ import java.lang.management.MemoryMXBean; import java.net.URLClassLoader; import java.util.Arrays; -import java.util.Collection; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,7 +37,7 @@ public abstract class RecordProcessor { protected JobConf jconf; - protected Collection inputs; + protected Map inputs; protected OutputCollector out; public static final Log l4j = LogFactory.getLog(RecordProcessor.class); @@ -59,7 +59,7 @@ * @param inputs * @param out */ - void init(JobConf jconf, MRTaskReporter mrReporter, Collection inputs, + void init(JobConf jconf, MRTaskReporter mrReporter, Map inputs, OutputCollector out){ this.jconf = jconf; this.reporter = mrReporter; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java new file mode 100644 index 0000000..62e38f4 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.exec.ObjectCache; +import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.tez.mapreduce.processor.MRTaskReporter; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.library.api.KeyValuesReader; +import org.apache.tez.runtime.library.input.ShuffledMergedInput; + +/** + * Process input from tez LogicalInput and write output - for a map plan + * Just pump the records through the query plan. + */ +public class ReduceRecordProcessor extends RecordProcessor{ + private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__"; + + public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); + private final ExecMapperContext execContext = new ExecMapperContext(); + private boolean abort = false; + private Deserializer inputKeyDeserializer; + + // Input value serde needs to be an array to support different SerDe + // for different tags + private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE]; + + TableDesc keyTableDesc; + TableDesc[] valueTableDesc; + + ObjectInspector[] rowObjectInspector; + private Operator reducer; + private boolean isTagged = false; + + private Object keyObject = null; + private BytesWritable groupKey; + + List row = new ArrayList(Utilities.reduceFieldNameList.size()); + + @Override + void init(JobConf jconf, MRTaskReporter mrReporter, Map inputs, + OutputCollector out){ + super.init(jconf, mrReporter, inputs, out); + + ObjectCache cache = ObjectCacheFactory.getCache(jconf); + + rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; + ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; + ObjectInspector keyObjectInspector; + + ReduceWork redWork = (ReduceWork) cache.retrieve(REDUCE_PLAN_KEY); + if (redWork == null) { + redWork = Utilities.getReduceWork(jconf); + cache.cache(REDUCE_PLAN_KEY, redWork); + } + + reducer = redWork.getReducer(); + reducer.setParentOperators(null); // clear out any parents as reducer is the + // root + isTagged = redWork.getNeedsTagging(); + try { + keyTableDesc = redWork.getKeyDesc(); + inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc + .getDeserializerClass(), null); + inputKeyDeserializer.initialize(null, keyTableDesc.getProperties()); + keyObjectInspector = inputKeyDeserializer.getObjectInspector(); + valueTableDesc = new TableDesc[redWork.getTagToValueDesc().size()]; + for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) { + // We should initialize the SerDe with the TypeInfo when available. + valueTableDesc[tag] = redWork.getTagToValueDesc().get(tag); + inputValueDeserializer[tag] = (SerDe) ReflectionUtils.newInstance( + valueTableDesc[tag].getDeserializerClass(), null); + inputValueDeserializer[tag].initialize(null, valueTableDesc[tag] + .getProperties()); + valueObjectInspector[tag] = inputValueDeserializer[tag] + .getObjectInspector(); + + ArrayList ois = new ArrayList(); + ois.add(keyObjectInspector); + ois.add(valueObjectInspector[tag]); + rowObjectInspector[tag] = ObjectInspectorFactory + .getStandardStructObjectInspector(Utilities.reduceFieldNameList, ois); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + MapredContext.init(false, new JobConf(jconf)); + + // initialize reduce operator tree + try { + l4j.info(reducer.dump(0)); + reducer.initialize(jconf, rowObjectInspector); + } catch (Throwable e) { + abort = true; + if (e instanceof OutOfMemoryError) { + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + throw new RuntimeException("Reduce operator initialization failed", e); + } + } + + reducer.setOutputCollector(out); + reducer.setReporter(reporter); + MapredContext.get().setReporter(reporter); + + + } + + @Override + void run() throws IOException{ + if (inputs.size() != 1) { + throw new IllegalArgumentException("ReduceRecordProcessor expects single input" + + ", inputCount=" + inputs.size()); + } + + //TODO - changes this for joins + ShuffledMergedInput in = (ShuffledMergedInput)inputs.values().iterator().next(); + KeyValuesReader reader = in.getReader(); + + //process records until done + while(reader.next()){ + Object key = reader.getCurrentKey(); + Iterable values = reader.getCurrentValues(); + boolean needMore = processKeyValues(key, values); + if(!needMore){ + break; + } + } + } + + /** + * @param key + * @param values + * @return true if it is not done and can take more inputs + */ + private boolean processKeyValues(Object key, Iterable values) { + if(reducer.getDone()){ + //done - no more records needed + return false; + } + + // reset the execContext for each new row + execContext.resetRow(); + + try { + BytesWritable keyWritable = (BytesWritable) key; + + byte tag = 0; + if (isTagged) { + // remove the tag from key coming out of reducer + // and store it in separate variable. + int size = keyWritable.getSize() - 1; + tag = keyWritable.get()[size]; + keyWritable.setSize(size); + } + + //Set the key, check if this is a new group or same group + if (!keyWritable.equals(groupKey)) { + // If a operator wants to do some work at the beginning of a group + if (groupKey == null) { // the first group + groupKey = new BytesWritable(); + } else { + // If a operator wants to do some work at the end of a group + l4j.trace("End Group"); + reducer.endGroup(); + } + + try { + keyObject = inputKeyDeserializer.deserialize(keyWritable); + } catch (Exception e) { + throw new HiveException( + "Hive Runtime Error: Unable to deserialize reduce input key from " + + Utilities.formatBinaryString(keyWritable.get(), 0, + keyWritable.getSize()) + " with properties " + + keyTableDesc.getProperties(), e); + } + + groupKey.set(keyWritable.get(), 0, keyWritable.getSize()); + l4j.trace("Start Group"); + reducer.startGroup(); + reducer.setGroupKeyObject(keyObject); + } + + //process all the values we have for this key + Iterator valuesIt = values.iterator(); + while (valuesIt.hasNext()) { + BytesWritable valueWritable = (BytesWritable) valuesIt.next(); + Object valueObj; + try { + valueObj = inputValueDeserializer[tag].deserialize(valueWritable); + } catch (SerDeException e) { + throw new HiveException( + "Hive Runtime Error: Unable to deserialize reduce input value (tag=" + + tag + + ") from " + + Utilities.formatBinaryString(valueWritable.get(), 0, + valueWritable.getSize()) + " with properties " + + valueTableDesc[tag].getProperties(), e); + } + row.clear(); + row.add(keyObject); + row.add(valueObj); + + try { + reducer.process(row, tag); + } catch (Exception e) { + String rowString = null; + try { + rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]); + } catch (Exception e2) { + rowString = "[Error getting row data with exception " + + StringUtils.stringifyException(e2) + " ]"; + } + throw new HiveException("Hive Runtime Error while processing row (tag=" + + tag + ") " + rowString, e); + } + if (isLogInfoEnabled) { + logProgress(); + } + } + + } catch (Throwable e) { + abort = true; + if (e instanceof OutOfMemoryError) { + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + l4j.fatal(StringUtils.stringifyException(e)); + throw new RuntimeException(e); + } + } + return true; //give me more + } + + @Override + void close(){ + // check if there are IOExceptions + if (!abort) { + abort = execContext.getIoCxt().getIOExceptions(); + } + // No row was processed + if (out == null) { + l4j.trace("Close called no row"); + } + + try { + if (groupKey != null) { + // If a operator wants to do some work at the end of a group + l4j.trace("End Group"); + reducer.endGroup(); + } + if (isLogInfoEnabled) { + logCloseInfo(); + } + + reducer.close(abort); + reportStats rps = new reportStats(reporter); + reducer.preorderMap(rps); + + } catch (Exception e) { + if (!abort) { + // signal new failure to map-reduce + l4j.error("Hit error while closing operators - failing tree"); + throw new RuntimeException("Hive Runtime Error while closing operators: " + + e.getMessage(), e); + } + } finally { + MapredContext.close(); + } + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java new file mode 100644 index 0000000..7152aae --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +/** + * Subclass that is used to indicate if this is a map or reduce process + */ +public class ReduceTezProcessor extends TezProcessor { + public ReduceTezProcessor(){ + super(false); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index e578bd8..fd5fee1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -27,7 +26,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.tez.common.TezUtils; -import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalIOProcessor; @@ -42,16 +40,16 @@ */ public class TezProcessor implements LogicalIOProcessor { private static final Log LOG = LogFactory.getLog(TezProcessor.class); + private boolean isMap = false; - boolean isMap; RecordProcessor rproc = null; private JobConf jobConf; private TezProcessorContext processorContext; - public TezProcessor() { - this.isMap = true; + public TezProcessor(boolean isMap) { + this.isMap = isMap; } @Override @@ -97,15 +95,7 @@ public void run(Map inputs, Map out LogicalInput in = inputs.values().iterator().next(); LogicalOutput out = outputs.values().iterator().next(); - MRInput input = (MRInput)in; - //update config - Configuration updatedConf = input.getConfigUpdates(); - if (updatedConf != null) { - for (Entry entry : updatedConf) { - this.jobConf.set(entry.getKey(), entry.getValue()); - } - } KeyValueWriter kvWriter = (KeyValueWriter)out.getWriter(); OutputCollector collector = new KVOutputCollector(kvWriter); @@ -114,11 +104,11 @@ public void run(Map inputs, Map out rproc = new MapRecordProcessor(); } else{ - throw new UnsupportedOperationException("Reduce is yet to be implemented"); + rproc = new ReduceRecordProcessor(); } MRTaskReporter mrReporter = new MRTaskReporter(processorContext); - rproc.init(jobConf, mrReporter, inputs.values(), collector); + rproc.init(jobConf, mrReporter, inputs, collector); rproc.run(); //done - output does not need to be committed as hive does not use outputcommitter @@ -140,4 +130,5 @@ public void collect(Object key, Object value) throws IOException { } } + }