diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java index 2510324..c54bffe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java @@ -44,7 +44,7 @@ public HiveMapFunctionResultList(Configuration conf, @Override protected void processNextRecord(Tuple2 inputRecord) throws IOException { - recordHandler.process(inputRecord._2()); + recordHandler.processRow(inputRecord._2()); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java index fa12b0a..b5d5454 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java @@ -48,8 +48,10 @@ public HiveReduceFunction(byte[] buffer) { jobConf.set("mapred.reducer.class", ExecReducer.class.getName()); } - ExecReducer reducer = new ExecReducer(); - reducer.configure(jobConf); - return new HiveReduceFunctionResultList(jobConf, it, reducer); + SparkReduceRecordHandler reducerRecordhandler = new SparkReduceRecordHandler(); + HiveReduceFunctionResultList result = new HiveReduceFunctionResultList(jobConf, it, reducerRecordhandler); + reducerRecordhandler.init(jobConf, result, Reporter.NULL); + + return result; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java index 2b7e538..7dcd81a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java @@ -20,7 +20,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; -import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.Reporter; @@ -32,7 +31,7 @@ public class HiveReduceFunctionResultList extends HiveBaseFunctionResultList>> { - private final ExecReducer reducer; + private final SparkReduceRecordHandler reduceRecordHandler; /** * Instantiate result set Iterable for Reduce function output. @@ -42,16 +41,16 @@ */ public HiveReduceFunctionResultList(Configuration conf, Iterator>> inputIterator, - ExecReducer reducer) { + SparkReduceRecordHandler reducer) { super(conf, inputIterator); - this.reducer = reducer; + this.reduceRecordHandler = reducer; setOutputCollector(); } @Override protected void processNextRecord(Tuple2> inputRecord) throws IOException { - reducer.reduce(inputRecord._1(), inputRecord._2().iterator(), this, Reporter.NULL); + reduceRecordHandler.processRow(inputRecord._1(), inputRecord._2().iterator()); } @Override @@ -61,13 +60,13 @@ protected boolean processingDone() { @Override protected void closeRecordProcessor() { - reducer.close(); + reduceRecordHandler.close(); } private void setOutputCollector() { - if (reducer != null && reducer.getReducer() != null) { + if (reduceRecordHandler != null && reduceRecordHandler.getReducer() != null) { OperatorUtils.setChildrenCollector( - Arrays.>asList(reducer.getReducer()), this); + Arrays.>asList(reduceRecordHandler.getReducer()), this); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java index e44ca23..2537789 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -39,12 +40,8 @@ import org.apache.hadoop.util.StringUtils; import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; -import java.net.URLClassLoader; -import java.util.Arrays; +import java.util.Iterator; import java.util.List; -import java.util.Map; /** @@ -57,45 +54,22 @@ * - Catch and handle errors during execution of the operators. * */ -public class SparkMapRecordHandler { +public class SparkMapRecordHandler extends SparkRecordHandler{ private static final String PLAN_KEY = "__MAP_PLAN__"; private MapOperator mo; - private OutputCollector oc; - private JobConf jc; - private boolean abort = false; - private Reporter rp; public static final Log l4j = LogFactory.getLog(SparkMapRecordHandler.class); private boolean done; - // used to log memory usage periodically - public static MemoryMXBean memoryMXBean; - private long numRows = 0; - private long nextCntr = 1; private MapredLocalWork localWork = null; private boolean isLogInfoEnabled = false; private final ExecMapperContext execContext = new ExecMapperContext(); public void init(JobConf job, OutputCollector output, Reporter reporter) { - // Allocate the bean at the beginning - - memoryMXBean = ManagementFactory.getMemoryMXBean(); - l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); + super.init(job, output, reporter); isLogInfoEnabled = l4j.isInfoEnabled(); - - try { - l4j.info("conf classpath = " - + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs())); - l4j.info("thread classpath = " - + Arrays.asList(((URLClassLoader) Thread.currentThread() - .getContextClassLoader()).getURLs())); - } catch (Exception e) { - l4j.info("cannot get classpath: " + e.getMessage()); - } - - setDone(false); - ObjectCache cache = ObjectCacheFactory.getCache(job); try { @@ -128,11 +102,8 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) { mo.initializeLocalWork(jc); mo.initialize(jc, null); - oc = output; - rp = reporter; OperatorUtils.setChildrenCollector(mo.getChildOperators(), output); mo.setReporter(rp); - MapredContext.get().setReporter(reporter); if (localWork == null) { return; @@ -158,26 +129,17 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) { } } - public void process(Object value) throws IOException { + @Override + public void processRow(Object value) throws IOException { // reset the execContext for each new row execContext.resetRow(); try { - if (mo.getDone()) { - done = true; - } else { - // Since there is no concept of a group, we don't invoke - // startGroup/endGroup for a mapper - mo.process((Writable)value); - if (isLogInfoEnabled) { - numRows++; - if (numRows == nextCntr) { - long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - l4j.info("ExecMapper: processing " + numRows - + " rows: used memory = " + used_memory); - nextCntr = getNextCntr(numRows); - } - } + // Since there is no concept of a group, we don't invoke + // startGroup/endGroup for a mapper + mo.process((Writable) value); + if (isLogInfoEnabled) { + logMemoryInfo(); } } catch (Throwable e) { abort = true; @@ -191,16 +153,9 @@ public void process(Object value) throws IOException { } } - - private long getNextCntr(long cntr) { - // A very simple counter to keep track of number of rows processed by the - // reducer. It dumps - // every 1 million times, and quickly before that - if (cntr >= 1000000) { - return cntr + 1000000; - } - - return 10 * cntr; + @Override + public void processRow(Object key, Iterator values) throws IOException { + throw new UnsupportedOperationException("Do not support this method in SparkMapRecordHandler."); } public void close() { @@ -229,9 +184,7 @@ public void close() { } if (isLogInfoEnabled) { - long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - l4j.info("ExecMapper: processed " + numRows + " rows: used memory = " - + used_memory); + logCloseInfo(); } ReportStats rps = new ReportStats(rp); @@ -250,39 +203,6 @@ public void close() { } public boolean getDone() { - return done; - } - - public boolean isAbort() { - return abort; - } - - public void setAbort(boolean abort) { - this.abort = abort; - } - - public void setDone(boolean done) { - this.done = done; - } - - /** - * reportStats. - * - */ - public static class ReportStats implements Operator.OperatorFunc { - private final Reporter rp; - - public ReportStats(Reporter rp) { - this.rp = rp; - } - - public void func(Operator op) { - Map, Long> opStats = op.getStats(); - for (Map.Entry, Long> e : opStats.entrySet()) { - if (rp != null) { - rp.incrCounter(e.getKey(), e.getValue()); - } - } - } + return mo.getDone(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java new file mode 100644 index 0000000..bb4465a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.Iterator; + +public abstract class SparkRecordHandler { + private static final Log LOG = LogFactory.getLog(SparkRecordHandler.class); + + // used to log memory usage periodically + protected final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + + protected JobConf jc; + protected OutputCollector oc; + protected Reporter rp; + protected boolean abort = false; + private long rowNumber = 0; + private long nextLogThreshold = 1; + + public void init(JobConf job, OutputCollector output, Reporter reporter) { + jc = job; + MapredContext.init(false, new JobConf(jc)); + + oc = output; + rp = reporter; + MapredContext.get().setReporter(reporter); + + LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); + + try { + LOG.info("conf classpath = " + + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs())); + LOG.info("thread classpath = " + + Arrays.asList(((URLClassLoader) Thread.currentThread() + .getContextClassLoader()).getURLs())); + } catch (Exception e) { + LOG.info("cannot get classpath: " + e.getMessage()); + } + } + + /** + * Process row with single value. + */ + public abstract void processRow(Object value) throws IOException; + + /** + * Process row with key and value collection. + */ + public abstract void processRow(Object key, Iterator values) throws IOException; + + /** + * Log processed row number and used memory info. + */ + protected void logMemoryInfo() { + rowNumber++; + if (rowNumber == nextLogThreshold) { + long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); + LOG.info("ExecReducer: processing " + rowNumber + + " rows: used memory = " + used_memory); + nextLogThreshold = getNextLogThreshold(rowNumber); + } + } + + abstract void close(); + + /** + * Log information to be logged at the end + */ + protected void logCloseInfo() { + long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); + LOG.info("ExecMapper: processed " + rowNumber + " rows: used memory = " + + used_memory); + } + + private long getNextLogThreshold(long currentThreshold) { + // A very simple counter to keep track of number of rows processed by the + // reducer. It dumps + // every 1 million times, and quickly before that + if (currentThreshold >= 1000000) { + return currentThreshold + 1000000; + } + + return 10 * currentThreshold; + } + + public boolean isAbort() { + return abort; + } + + public void setAbort(boolean abort) { + this.abort = abort; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java new file mode 100644 index 0000000..c0b6b78 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.exec.ObjectCache; +import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; + +/** + * Clone from ExecReducer, it is the bridge between the spark framework and + * the Hive operator pipeline at execution time. It's main responsibilities are: + * + * - Load and setup the operator pipeline from XML + * - Run the pipeline by transforming key, value pairs to records and forwarding them to the operators + * - Sending start and end group messages to separate records with same key from one another + * - Catch and handle errors during execution of the operators. + * + */ +public class SparkReduceRecordHandler extends SparkRecordHandler{ + + private static final Log LOG = LogFactory.getLog(SparkReduceRecordHandler.class); + private static final String PLAN_KEY = "__REDUCE_PLAN__"; + + // Input value serde needs to be an array to support different SerDe + // for different tags + private final Deserializer[] inputValueDeserializer = new Deserializer[Byte.MAX_VALUE]; + private final Object[] valueObject = new Object[Byte.MAX_VALUE]; + private final List row = new ArrayList(Utilities.reduceFieldNameList.size()); + private final boolean isLogInfoEnabled = LOG.isInfoEnabled(); + + // TODO: move to DynamicSerDe when it's ready + private Deserializer inputKeyDeserializer; + private Operator reducer; + private boolean isTagged = false; + private TableDesc keyTableDesc; + private TableDesc[] valueTableDesc; + private ObjectInspector[] rowObjectInspector; + + // runtime objects + private transient Object keyObject; + private transient BytesWritable groupKey; + + public void init(JobConf job, OutputCollector output, Reporter reporter) { + super.init(job, output, reporter); + + rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; + ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; + ObjectInspector keyObjectInspector; + + ObjectCache cache = ObjectCacheFactory.getCache(jc); + ReduceWork gWork = (ReduceWork) cache.retrieve(PLAN_KEY); + if (gWork == null) { + gWork = Utilities.getReduceWork(job); + cache.cache(PLAN_KEY, gWork); + } else { + Utilities.setReduceWork(job, gWork); + } + + reducer = gWork.getReducer(); + reducer.setParentOperators(null); // clear out any parents as reducer is the + // root + isTagged = gWork.getNeedsTagging(); + try { + keyTableDesc = gWork.getKeyDesc(); + inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc + .getDeserializerClass(), null); + SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null); + keyObjectInspector = inputKeyDeserializer.getObjectInspector(); + valueTableDesc = new TableDesc[gWork.getTagToValueDesc().size()]; + for (int tag = 0; tag < gWork.getTagToValueDesc().size(); tag++) { + // We should initialize the SerDe with the TypeInfo when available. + valueTableDesc[tag] = gWork.getTagToValueDesc().get(tag); + inputValueDeserializer[tag] = ReflectionUtils.newInstance( + valueTableDesc[tag].getDeserializerClass(), null); + SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null, + valueTableDesc[tag].getProperties(), null); + valueObjectInspector[tag] = inputValueDeserializer[tag] + .getObjectInspector(); + + ArrayList ois = new ArrayList(); + ois.add(keyObjectInspector); + ois.add(valueObjectInspector[tag]); + reducer.setGroupKeyObjectInspector(keyObjectInspector); + rowObjectInspector[tag] = ObjectInspectorFactory + .getStandardStructObjectInspector(Utilities.reduceFieldNameList, ois); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + reducer.setReporter(rp); + + // initialize reduce operator tree + try { + LOG.info(reducer.dump(0)); + reducer.initialize(jc, rowObjectInspector); + } catch (Throwable e) { + abort = true; + if (e instanceof OutOfMemoryError) { + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + throw new RuntimeException("Reduce operator initialization failed", e); + } + } + } + + @Override + public void processRow(Object value) throws IOException { + throw new UnsupportedOperationException("Do not support this method in SparkReduceRecordHandler."); + } + + @Override + public void processRow(Object key, Iterator values) throws IOException { + if (reducer.getDone()) { + return; + } + + try { + BytesWritable keyWritable = (BytesWritable) key; + byte tag = 0; + if (isTagged) { + // remove the tag from key coming out of reducer + // and store it in separate variable. + int size = keyWritable.getSize() - 1; + tag = keyWritable.get()[size]; + keyWritable.setSize(size); + } + + if (!keyWritable.equals(groupKey)) { + // If a operator wants to do some work at the beginning of a group + if (groupKey == null) { // the first group + groupKey = new BytesWritable(); + } else { + // If a operator wants to do some work at the end of a group + LOG.trace("End Group"); + reducer.endGroup(); + } + + try { + keyObject = inputKeyDeserializer.deserialize(keyWritable); + } catch (Exception e) { + throw new HiveException( + "Hive Runtime Error: Unable to deserialize reduce input key from " + + Utilities.formatBinaryString(keyWritable.get(), 0, + keyWritable.getSize()) + " with properties " + + keyTableDesc.getProperties(), e); + } + + groupKey.set(keyWritable.get(), 0, keyWritable.getSize()); + LOG.trace("Start Group"); + reducer.setGroupKeyObject(keyObject); + reducer.startGroup(); + } + // System.err.print(keyObject.toString()); + while (values.hasNext()) { + BytesWritable valueWritable = (BytesWritable) values.next(); + // System.err.print(who.getHo().toString()); + try { + valueObject[tag] = inputValueDeserializer[tag].deserialize(valueWritable); + } catch (SerDeException e) { + throw new HiveException( + "Hive Runtime Error: Unable to deserialize reduce input value (tag=" + + tag + + ") from " + + Utilities.formatBinaryString(valueWritable.get(), 0, + valueWritable.getSize()) + " with properties " + + valueTableDesc[tag].getProperties(), e); + } + row.clear(); + row.add(keyObject); + row.add(valueObject[tag]); + if (isLogInfoEnabled) { + logMemoryInfo(); + } + try { + reducer.processOp(row, tag); + } catch (Exception e) { + String rowString = null; + try { + rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]); + } catch (Exception e2) { + rowString = "[Error getting row data with exception " + + StringUtils.stringifyException(e2) + " ]"; + } + throw new HiveException("Hive Runtime Error while processing row (tag=" + + tag + ") " + rowString, e); + } + } + + } catch (Throwable e) { + abort = true; + if (e instanceof OutOfMemoryError) { + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + LOG.fatal(StringUtils.stringifyException(e)); + throw new RuntimeException(e); + } + } + } + + public void close() { + + // No row was processed + if (oc == null) { + LOG.trace("Close called without any rows processed"); + } + + try { + if (groupKey != null) { + // If a operator wants to do some work at the end of a group + LOG.trace("End Group"); + reducer.endGroup(); + } + if (isLogInfoEnabled) { + logCloseInfo(); + } + + reducer.close(abort); + ReportStats rps = new ReportStats(rp); + reducer.preorderMap(rps); + + } catch (Exception e) { + if (!abort) { + // signal new failure to map-reduce + LOG.error("Hit error while closing operators - failing tree"); + throw new RuntimeException("Hive Runtime Error while closing operators: " + + e.getMessage(), e); + } + } finally { + MapredContext.close(); + Utilities.clearWorkMap(); + } + } + + public Operator getReducer() { + return reducer; + } +}