diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index dd3533c..1151627 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -211,7 +211,7 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, int seqNo, byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf); if (inputSplitInfo.getNumTasks() != 0) { map = new Vertex("Map "+seqNo, - new ProcessorDescriptor(MapProcessor.class.getName()). + new ProcessorDescriptor(TezProcessor.class.getName()). setUserPayload(serializedConf), inputSplitInfo.getNumTasks(), MRHelpers.getMapResource(conf)); Map environment = new HashMap(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java new file mode 100644 index 0000000..6fde80f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.MapOperator; +import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.exec.ObjectCache; +import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.util.StringUtils; +import org.apache.tez.mapreduce.input.MRInput; +import org.apache.tez.mapreduce.processor.MRTaskReporter; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.library.api.KeyValueReader; + +/** + * Process input from tez LogicalInput and write output - for a map plan + * Just pump the records through the query plan. + */ +public class MapRecordProcessor extends RecordProcessor{ + + private static final String PLAN_KEY = "__MAP_PLAN__"; + private MapOperator mapOp; + public static final Log l4j = LogFactory.getLog(RecordProcessor.class); + private final ExecMapperContext execContext = new ExecMapperContext(); + private boolean abort = false; + + + @Override + void init(JobConf jconf, MRTaskReporter mrReporter, Collection inputs, + OutputCollector out){ + super.init(jconf, mrReporter, inputs, out); + + ObjectCache cache = ObjectCacheFactory.getCache(jconf); + try { + + execContext.setJc(jconf); + // create map and fetch operators + MapWork mrwork = (MapWork) cache.retrieve(PLAN_KEY); + if (mrwork == null) { + mrwork = Utilities.getMapWork(jconf); + cache.cache(PLAN_KEY, mrwork); + } + mapOp = new MapOperator(); + + // initialize map operator + mapOp.setConf(mrwork); + mapOp.setChildren(jconf); + l4j.info(mapOp.dump(0)); + + MapredContext.init(true, new JobConf(jconf)); + mapOp.setExecContext(execContext); + mapOp.initializeLocalWork(jconf); + mapOp.initialize(jconf, null); + + mapOp.setOutputCollector(out); + mapOp.setReporter(reporter); + MapredContext.get().setReporter(reporter); + + + } catch (Throwable e) { + abort = true; + if (e instanceof OutOfMemoryError) { + // will this be true here? + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + throw new RuntimeException("Map operator initialization failed", e); + } + } + } + + + @Override + void run() throws IOException{ + if (inputs.size() != 1) { + throw new IllegalArgumentException("MapRecordProcessor expects single input" + + ", inputCount=" + inputs.size()); + } + + MRInput in = (MRInput)inputs.iterator().next(); + KeyValueReader reader = in.getReader(); + + //process records until done + while(reader.next()){ + //ignore the key for maps - reader.getCurrentKey(); + Object value = reader.getCurrentValue(); + boolean needMore = processRow(value); + if(!needMore){ + break; + } + } + + } + + + /** + * @param value value to process + * @return true if it is not done and can take more inputs + */ + private boolean processRow(Object value) { + // reset the execContext for each new row + execContext.resetRow(); + + try { + if (mapOp.getDone()) { + return false; //done + } else { + // Since there is no concept of a group, we don't invoke + // startGroup/endGroup for a mapper + mapOp.process((Writable)value); + if (isLogInfoEnabled) { + logProgress(); + } + } + } catch (Throwable e) { + abort = true; + if (e instanceof OutOfMemoryError) { + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + l4j.fatal(StringUtils.stringifyException(e)); + throw new RuntimeException(e); + } + + } + return true; //give me more + + } + + @Override + void close(){ + + // check if there are IOExceptions + if (!abort) { + abort = execContext.getIoCxt().getIOExceptions(); + } + + // detecting failed executions by exceptions thrown by the operator tree + try { + mapOp.close(abort); + if (isLogInfoEnabled) { + logCloseInfo(); + } + reportStats rps = new reportStats(reporter); + mapOp.preorderMap(rps); + return; + } catch (Exception e) { + if (!abort) { + // signal new failure to map-reduce + l4j.error("Hit error while closing operators - failing tree"); + throw new RuntimeException("Hive Runtime Error while closing operators", e); + } + } finally { + MapredContext.close(); + } + } + + + + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java new file mode 100644 index 0000000..fff8314 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.tez.mapreduce.processor.MRTaskReporter; +import org.apache.tez.runtime.api.LogicalInput; + +/** + * Process input from tez LogicalInput and write output + * It has different subclasses for map and reduce processing + */ +public abstract class RecordProcessor { + + protected JobConf jconf; + protected Collection inputs; + protected OutputCollector out; + + public static final Log l4j = LogFactory.getLog(RecordProcessor.class); + + + // used to log memory usage periodically + public static MemoryMXBean memoryMXBean; + protected boolean isLogInfoEnabled = false; + protected MRTaskReporter reporter; + + private long numRows = 0; + private long nextUpdateCntr = 1; + + + /** + * Common initialization code for RecordProcessors + * @param jconf + * @param mrReporter + * @param inputs + * @param out + */ + void init(JobConf jconf, MRTaskReporter mrReporter, Collection inputs, + OutputCollector out){ + this.jconf = jconf; + this.reporter = mrReporter; + this.inputs = inputs; + this.out = out; + + // Allocate the bean at the beginning - + memoryMXBean = ManagementFactory.getMemoryMXBean(); + + + l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); + + isLogInfoEnabled = l4j.isInfoEnabled(); + + //log classpaths + try { + l4j.info("conf classpath = " + + Arrays.asList(((URLClassLoader) jconf.getClassLoader()).getURLs())); + l4j.info("thread classpath = " + + Arrays.asList(((URLClassLoader) Thread.currentThread() + .getContextClassLoader()).getURLs())); + } catch (Exception e) { + l4j.info("cannot get classpath: " + e.getMessage()); + } + + } + + /** + * start processing the inputs and writing output + * @throws IOException + */ + abstract void run() throws IOException; + + + abstract void close(); + + /** + * Log information to be logged at the end + */ + protected void logCloseInfo() { + long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); + l4j.info("ExecMapper: processed " + numRows + " rows: used memory = " + + used_memory); + } + + /** + * Log number of records processed and memory used after processing many records + */ + protected void logProgress() { + numRows++; + if (numRows == nextUpdateCntr) { + long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); + l4j.info("ExecMapper: processing " + numRows + + " rows: used memory = " + used_memory); + nextUpdateCntr = getNextUpdateRecordCounter(numRows); + } + } + + private long getNextUpdateRecordCounter(long cntr) { + // A very simple counter to keep track of number of rows processed by the + // reducer. It dumps + // every 1 million times, and quickly before that + if (cntr >= 1000000) { + return cntr + 1000000; + } + + return 10 * cntr; + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java new file mode 100644 index 0000000..b2b8b18 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.util.StringUtils; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.mapreduce.input.MRInput; +import org.apache.tez.mapreduce.input.MRInputLegacy; +import org.apache.tez.mapreduce.output.MROutput; +import org.apache.tez.mapreduce.processor.MRTaskReporter; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.LogicalIOProcessor; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.api.TezProcessorContext; +import org.apache.tez.runtime.library.api.KeyValueWriter; +import org.apache.tez.runtime.library.output.OnFileSortedOutput; + +/** + * Hive processor for Tez that forms the vertices in Tez and processes the data. + * Does what ExecMapper and ExecReducer does for hive in MR framework. + */ +public class TezProcessor implements LogicalIOProcessor { + private static final Log LOG = LogFactory.getLog(TezProcessor.class); + + boolean isMap; + RecordProcessor rproc = null; + + private JobConf jobConf; + + private TezProcessorContext processorContext; + + public TezProcessor() { + this.isMap = true; + } + + @Override + public void close() throws IOException { + if(rproc != null){ + rproc.close(); + } + } + + @Override + public void handleEvents(List arg0) { + // TODO Auto-generated method stub + + } + + @Override + public void initialize(TezProcessorContext processorContext) + throws IOException { + this.processorContext = processorContext; + //get the jobconf + byte[] userPayload = processorContext.getUserPayload(); + Configuration conf = TezUtils.createConfFromUserPayload(userPayload); + this.jobConf = new JobConf(conf); + } + + + @Override + public void run(Map inputs, Map outputs) + throws IOException, InterruptedException { + // in case of broadcast-join read the broadcast edge inputs + // (possibly asynchronously) + + LOG.info("Running map: " + processorContext.getUniqueIdentifier()); + + //TODO - change this to support shuffle joins, broadcast joins . + if (inputs.size() != 1 + || outputs.size() != 1) { + throw new IOException("Cannot handle multiple inputs or outputs" + + ", inputCount=" + inputs.size() + + ", outputCount=" + outputs.size()); + } + LogicalInput in = inputs.values().iterator().next(); + LogicalOutput out = outputs.values().iterator().next(); + + // Sanity check + if (!(in instanceof MRInput)) { + throw new IOException(new TezException( + "Only Simple Input supported. Input: " + in.getClass())); + } + MRInput input = (MRInputLegacy)in; + + //update config + Configuration updatedConf = input.getConfigUpdates(); + if (updatedConf != null) { + for (Entry entry : updatedConf) { + this.jobConf.set(entry.getKey(), entry.getValue()); + } + } + + + KeyValueWriter kvWriter = null; + //TODO: this instanceof probably can be cleaned up + if (!(out instanceof OnFileSortedOutput)) { + kvWriter = ((MROutput)out).getWriter(); + } else { + kvWriter = ((OnFileSortedOutput)out).getWriter(); + } + + OutputCollector collector = new KVOutputCollector(kvWriter); + + if(isMap){ + rproc = new MapRecordProcessor(); + } + else{ + //TODO: implement reduce side + throw new UnsupportedOperationException("Reduce is yet to be implemented"); + } + MRTaskReporter mrReporter = new MRTaskReporter(processorContext); + rproc.init(jobConf, mrReporter, inputs.values(), collector); + rproc.run(); + + done(out); + } + + private void done(LogicalOutput output) throws IOException { + if(output instanceof MROutput){ + MROutput sOut = (MROutput)output; + if (sOut.isCommitRequired()) { + //wait for commit approval and commit + // TODO EVENTUALLY - Commit is not required for map tasks. + // skip a couple of RPCs before exiting. + commit(sOut); + } + } + } + + private void commit(MROutput output) throws IOException { + int retries = 3; + while (true) { + // This will loop till the AM asks for the task to be killed. As + // against, the AM sending a signal to the task to kill itself + // gracefully. + try { + if (processorContext.canCommit()) { + break; + } + Thread.sleep(1000); + } catch(InterruptedException ie) { + //ignore + } catch (IOException ie) { + LOG.warn("Failure sending canCommit: " + + StringUtils.stringifyException(ie)); + if (--retries == 0) { + throw ie; + } + } + } + + // task can Commit now + try { + output.commit(); + return; + } catch (IOException iee) { + LOG.warn("Failure committing: " + + StringUtils.stringifyException(iee)); + //if it couldn't commit a successfully then delete the output + discardOutput(output); + throw iee; + } + } + + private void discardOutput(MROutput output) { + try { + output.abort(); + } catch (IOException ioe) { + LOG.warn("Failure cleaning up: " + + StringUtils.stringifyException(ioe)); + } + } + + /** + * KVOutputCollector. OutputCollector that writes using KVWriter + * + */ + static class KVOutputCollector implements OutputCollector { + private final KeyValueWriter output; + + KVOutputCollector(KeyValueWriter output) { + this.output = output; + } + + public void collect(Object key, Object value) throws IOException { + output.write(key, value); + } + } + +}