diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 79af08d..bf94930 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -36,8 +36,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.serde2.ByteStream.Output; -import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.tez.runtime.api.LogicalInput; @@ -75,6 +75,7 @@ public void load( HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR); boolean useLazyRows = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINLAZYHASHTABLE); + TezCacheAccess tezCacheAccess = TezCacheAccess.createInstance(hconf); // We only check if we can use optimized keys here; that is ok because we don't // create optimized keys in MapJoin if hash map doesn't have optimized keys. if (!HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDKEYS)) { @@ -86,7 +87,8 @@ public void load( continue; } - LogicalInput input = tezContext.getInput(parentToInput.get(pos)); + String inputName = parentToInput.get(pos); + LogicalInput input = tezContext.getInput(inputName); try { KeyValueReader kvReader = (KeyValueReader) input.getReader(); @@ -119,6 +121,9 @@ public void load( } catch (Exception e) { throw new HiveException(e); } + // Register that the Input has been cached. + tezCacheAccess.registerCachedInput(inputName); + LOG.info("Setting Input: " + inputName + " as cached"); } if (lastKey == null) { lastKey = new MapJoinKeyObject(); // No rows in tables, the key type doesn't matter. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 380f21d..4d724f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -35,24 +35,26 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.util.StringUtils; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.api.TezProcessorContext; import org.apache.tez.runtime.library.api.KeyValueReader; /** * Process input from tez LogicalInput and write output - for a map plan * Just pump the records through the query plan. */ -public class MapRecordProcessor extends RecordProcessor{ +public class MapRecordProcessor extends RecordProcessor { private MapOperator mapOp; @@ -63,18 +65,13 @@ private MapWork mapWork; @Override - void init(JobConf jconf, MRTaskReporter mrReporter, Map inputs, - Map outMap){ + void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter, + Map inputs, Map outputs) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); - super.init(jconf, mrReporter, inputs, outMap); + super.init(jconf, processorContext, mrReporter, inputs, outputs); //Update JobConf using MRInput, info like filename comes via this - MRInputLegacy mrInput = getMRInput(inputs); - try { - mrInput.init(); - } catch (IOException e) { - throw new RuntimeException("Failed while initializing MRInput", e); - } + MRInputLegacy mrInput = TezProcessor.getMRInput(inputs); Configuration updatedConf = mrInput.getConfigUpdates(); if (updatedConf != null) { for (Entry entry : updatedConf) { @@ -82,6 +79,14 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in } } + createOutputMap(); + // Start all the Outputs. + for (Entry outputEntry : outputs.entrySet()) { + l4j.info("Starting Output: " + outputEntry.getKey()); + ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); + outputEntry.getValue().start(); + } + ObjectCache cache = ObjectCacheFactory.getCache(jconf); try { @@ -143,25 +148,10 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); } - private MRInputLegacy getMRInput(Map inputs) { - //there should be only one MRInput - MRInputLegacy theMRInput = null; - for(LogicalInput inp : inputs.values()){ - if(inp instanceof MRInputLegacy){ - if(theMRInput != null){ - throw new IllegalArgumentException("Only one MRInput is expected"); - } - //a better logic would be to find the alias - theMRInput = (MRInputLegacy)inp; - } - } - return theMRInput; - } - @Override void run() throws IOException{ - MRInputLegacy in = getMRInput(inputs); + MRInputLegacy in = TezProcessor.getMRInput(inputs); KeyValueReader reader = in.getReader(); //process records until done diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java index ea771f3..217a40c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -16,20 +16,26 @@ * limitations under the License. */ package org.apache.hadoop.hive.ql.exec.tez; -import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.net.URLClassLoader; import java.util.Arrays; import java.util.Map; +import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.api.TezProcessorContext; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; /** * Process input from tez LogicalInput and write output @@ -39,7 +45,9 @@ protected JobConf jconf; protected Map inputs; + protected Map outputs; protected Map outMap; + protected TezProcessorContext processorContext; public static final Log l4j = LogFactory.getLog(RecordProcessor.class); @@ -54,20 +62,22 @@ protected PerfLogger perfLogger = PerfLogger.getPerfLogger(); protected String CLASS_NAME = RecordProcessor.class.getName(); - /** * Common initialization code for RecordProcessors * @param jconf + * @param processorContext the {@link TezProcessorContext} * @param mrReporter - * @param inputs - * @param out + * @param inputs map of Input names to {@link LogicalInput}s + * @param outputs map of Output names to {@link LogicalOutput}s + * @throws Exception */ - void init(JobConf jconf, MRTaskReporter mrReporter, Map inputs, - Map outMap){ + void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter, + Map inputs, Map outputs) throws Exception { this.jconf = jconf; this.reporter = mrReporter; this.inputs = inputs; - this.outMap = outMap; + this.outputs = outputs; + this.processorContext = processorContext; // Allocate the bean at the beginning - memoryMXBean = ManagementFactory.getMemoryMXBean(); @@ -92,9 +102,9 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in /** * start processing the inputs and writing output - * @throws IOException + * @throws Exception */ - abstract void run() throws IOException; + abstract void run() throws Exception; abstract void close(); @@ -132,4 +142,12 @@ private long getNextUpdateRecordCounter(long cntr) { return 10 * cntr; } + protected void createOutputMap() { + Preconditions.checkState(outMap == null, "Outputs should only be setup once"); + outMap = Maps.newHashMap(); + for (Entry entry : outputs.entrySet()) { + TezKVOutputCollector collector = new TezKVOutputCollector(entry.getValue()); + outMap.put(entry.getKey(), collector); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index d89f2c7..58197bd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; import org.apache.hadoop.hive.ql.exec.tez.tools.InputMerger; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -53,9 +55,14 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.tez.mapreduce.processor.MRTaskReporter; +import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.api.TezProcessorContext; import org.apache.tez.runtime.library.api.KeyValuesReader; +import com.google.common.collect.Lists; + /** * Process input from tez LogicalInput and write output - for a map plan * Just pump the records through the query plan. @@ -88,10 +95,10 @@ List row = new ArrayList(Utilities.reduceFieldNameList.size()); @Override - void init(JobConf jconf, MRTaskReporter mrReporter, Map inputs, - Map outMap){ + void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter, + Map inputs, Map outputs) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); - super.init(jconf, mrReporter, inputs, outMap); + super.init(jconf, processorContext, mrReporter, inputs, outputs); ObjectCache cache = ObjectCacheFactory.getCache(jconf); @@ -163,6 +170,7 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in if (dummyOps != null) { children.addAll(dummyOps); } + createOutputMap(); OperatorUtils.setChildrenCollector(children, outMap); reducer.setReporter(reporter); @@ -182,10 +190,20 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in } @Override - void run() throws IOException{ + void run() throws Exception { List shuffleInputs = getShuffleInputs(inputs); - KeyValuesReader kvsReader; + if (shuffleInputs != null) { + l4j.info("Waiting for ShuffleInputs to become ready"); + processorContext.waitForAllInputsReady(new ArrayList(shuffleInputs)); + } + for (Entry outputEntry : outputs.entrySet()) { + l4j.info("Starting Output: " + outputEntry.getKey()); + ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); + outputEntry.getValue().start(); + } + + KeyValuesReader kvsReader; try { if(shuffleInputs.size() == 1){ //no merging of inputs required diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java index e69de29..067731c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.ObjectCache; +import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; + +/** + * Access to the Object cache from Tez, along with utility methods for accessing specific Keys. + */ +public class TezCacheAccess { + + private TezCacheAccess(ObjectCache cache) { + this.cache = cache; + } + + private ObjectCache cache; + + public static TezCacheAccess createInstance(Configuration conf) { + ObjectCache cache = ObjectCacheFactory.getCache(conf); + return new TezCacheAccess(cache); + } + + private static final String CACHED_INPUT_KEY = "CACHED_INPUTS"; + + private final ReentrantLock cachedInputLock = new ReentrantLock(); + + public boolean isInputCached(String inputName) { + this.cachedInputLock.lock(); + try { + @SuppressWarnings("unchecked") + Set cachedInputs = (Set) cache.retrieve(CACHED_INPUT_KEY); + if (cachedInputs == null) { + return false; + } else { + return cachedInputs.contains(inputName); + } + } finally { + this.cachedInputLock.unlock(); + } + } + + public void registerCachedInput(String inputName) { + this.cachedInputLock.lock(); + try { + @SuppressWarnings("unchecked") + Set cachedInputs = (Set) cache.retrieve(CACHED_INPUT_KEY); + if (cachedInputs == null) { + cachedInputs = Collections.newSetFromMap(new ConcurrentHashMap()); + cache.cache(CACHED_INPUT_KEY, cachedInputs); + } + cachedInputs.add(inputName); + } finally { + this.cachedInputLock.unlock(); + } + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java index 40d0658..01d68ab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.mapred.JobConf; import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.api.LogicalOutput; /** * TezContext contains additional context only available with Tez @@ -30,6 +31,8 @@ // all the inputs for the tez processor private Map inputs; + + private Map outputs; public TezContext(boolean isMap, JobConf jobConf) { super(isMap, jobConf); @@ -38,6 +41,10 @@ public TezContext(boolean isMap, JobConf jobConf) { public void setInputs(Map inputs) { this.inputs = inputs; } + + public void setOutputs(Map outputs) { + this.outputs = outputs; + } public LogicalInput getInput(String name) { if (inputs == null) { @@ -45,4 +52,11 @@ public LogicalInput getInput(String name) { } return inputs.get(name); } + + public LogicalOutput getOutput(String name) { + if (outputs == null) { + return null; + } + return outputs.get(name); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 9be2aa2..691bdfd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -21,15 +21,16 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.KVOutputCollector; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.tez.common.TezUtils; +import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalIOProcessor; @@ -43,6 +44,9 @@ * Does what ExecMapper and ExecReducer does for hive in MR framework. */ public class TezProcessor implements LogicalIOProcessor { + + + private static final Log LOG = LogFactory.getLog(TezProcessor.class); private boolean isMap = false; @@ -123,32 +127,34 @@ public void run(Map inputs, Map out // in case of broadcast-join read the broadcast edge inputs // (possibly asynchronously) - LOG.info("Running map: " + processorContext.getUniqueIdentifier()); - for (LogicalInput input : inputs.values()) { - input.start(); - } - for (LogicalOutput output : outputs.values()) { - output.start(); - } - - Map outMap = new HashMap(); + LOG.info("Running task: " + processorContext.getUniqueIdentifier()); - for (String outputName: outputs.keySet()) { - LOG.info("Handling output: " + outputName); - KeyValueWriter kvWriter = (KeyValueWriter) outputs.get(outputName).getWriter(); - OutputCollector collector = new KVOutputCollector(kvWriter); - outMap.put(outputName, collector); - } - - if(isMap){ + if (isMap) { rproc = new MapRecordProcessor(); - } - else{ + MRInputLegacy mrInput = getMRInput(inputs); + try { + mrInput.init(); + } catch (IOException e) { + throw new RuntimeException("Failed while initializing MRInput", e); + } + } else { rproc = new ReduceRecordProcessor(); } + TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf); + // Start the actual Inputs. After MRInput initialization. + for (Entry inputEntry : inputs.entrySet()) { + if (!cacheAccess.isInputCached(inputEntry.getKey())) { + inputEntry.getValue().start(); + } else { + LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start"); + } + } + + // Outputs will be started later by the individual Processors. + MRTaskReporter mrReporter = new MRTaskReporter(processorContext); - rproc.init(jobConf, mrReporter, inputs, outMap); + rproc.init(jobConf, processorContext, mrReporter, inputs, outputs); rproc.run(); //done - output does not need to be committed as hive does not use outputcommitter @@ -156,19 +162,39 @@ public void run(Map inputs, Map out } /** - * KVOutputCollector. OutputCollector that writes using KVWriter - * + * KVOutputCollector. OutputCollector that writes using KVWriter. + * Must be initialized before it is used. + * */ - static class KVOutputCollector implements OutputCollector { - private final KeyValueWriter output; + static class TezKVOutputCollector implements OutputCollector { + private KeyValueWriter writer; + private final LogicalOutput output; - KVOutputCollector(KeyValueWriter output) { - this.output = output; + TezKVOutputCollector(LogicalOutput logicalOutput) { + this.output = logicalOutput; + } + + void initialize() throws Exception { + this.writer = (KeyValueWriter) output.getWriter(); } public void collect(Object key, Object value) throws IOException { - output.write(key, value); + writer.write(key, value); } } + static MRInputLegacy getMRInput(Map inputs) { + //there should be only one MRInput + MRInputLegacy theMRInput = null; + for(LogicalInput inp : inputs.values()){ + if(inp instanceof MRInputLegacy){ + if(theMRInput != null){ + throw new IllegalArgumentException("Only one MRInput is expected"); + } + //a better logic would be to find the alias + theMRInput = (MRInputLegacy)inp; + } + } + return theMRInput; + } }