diff --git ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java new file mode 100644 index 0000000..d962c62 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/HashTableLoaderFactory.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.exec.HashTableLoader; + +/** + * HashTableLoaderFactory is used to determine the strategy + * of loading the hashtables for the MapJoinOperator + */ +public class HashTableLoaderFactory { + + private HashTableLoaderFactory() { + } + + public static HashTableLoader getLoader(Configuration hconf) { + if (HiveConf.getBoolVar(hconf, ConfVars.HIVE_OPTIMIZE_TEZ)) { + return new org.apache.hadoop.hive.ql.exec.tez.HashTableLoader(); + } else { + return new org.apache.hadoop.hive.ql.exec.mr.HashTableLoader(); + } + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java new file mode 100644 index 0000000..a080fcc --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; + +/** + * HashTableLoader is an interface used by MapJoinOperator used to load the hashtables + * needed to process the join. + */ +public interface HashTableLoader { + + void load(ExecMapperContext context, Configuration hconf, MapJoinDesc desc, byte posBigTable, + MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes) + throws HiveException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index de4dbaf..8633321 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -18,18 +18,13 @@ package org.apache.hadoop.hive.ql.exec; -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.ObjectInputStream; import java.io.Serializable; import java.util.ArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.HashTableLoaderFactory; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer; @@ -41,7 +36,6 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.ReflectionUtils; /** @@ -55,6 +49,8 @@ private transient String serdeKey; private transient ObjectCache cache; + private HashTableLoader loader; + private static final transient String[] FATAL_ERR_MSG = { null, // counter value 0 means no error "Mapside join exceeds available memory. " @@ -82,6 +78,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { serdeKey = "__HASH_MAP_"+this.getOperatorId()+"_serde"; cache = ObjectCacheFactory.getCache(hconf); + loader = HashTableLoaderFactory.getLoader(hconf); mapJoinTables = (MapJoinTableContainer[]) cache.retrieve(tableKey); mapJoinTableSerdes = (MapJoinTableContainerSerDe[]) cache.retrieve(serdeKey); @@ -103,7 +100,7 @@ protected void fatalErrorMessage(StringBuilder errMsg, long counterCode) { public void generateMapMetaData() throws HiveException, SerDeException { // generate the meta data for key // index for key is -1 - + TableDesc keyTableDesc = conf.getKeyTblDesc(); SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null); @@ -128,6 +125,7 @@ public void generateMapMetaData() throws HiveException, SerDeException { } private void loadHashTable() throws HiveException { + if (!this.getExecContext().getLocalWork().getInputFileChangeSensitive()) { if (hashTblInitedOnce) { return; @@ -136,50 +134,8 @@ private void loadHashTable() throws HiveException { } } - String baseDir = null; - String currentInputFile = getExecContext().getCurrentInputFile(); - LOG.info("******* Load from HashTable File: input : " + currentInputFile); - String fileName = getExecContext().getLocalWork().getBucketFileName(currentInputFile); - try { - if (ShimLoader.getHadoopShims().isLocalMode(hconf)) { - baseDir = this.getExecContext().getLocalWork().getTmpFileURI(); - } else { - Path[] localArchives; - String stageID = this.getExecContext().getLocalWork().getStageID(); - String suffix = Utilities.generateTarFileName(stageID); - FileSystem localFs = FileSystem.getLocal(hconf); - localArchives = DistributedCache.getLocalCacheArchives(this.hconf); - Path archive; - for (int j = 0; j < localArchives.length; j++) { - archive = localArchives[j]; - if (!archive.getName().endsWith(suffix)) { - continue; - } - Path archiveLocalLink = archive.makeQualified(localFs); - baseDir = archiveLocalLink.toUri().getPath(); - } - } - for (int pos = 0; pos < mapJoinTables.length; pos++) { - if (pos == posBigTable) { - continue; - } - if(baseDir == null) { - throw new IllegalStateException("baseDir cannot be null"); - } - String filePath = Utilities.generatePath(baseDir, conf.getDumpFilePrefix(), (byte)pos, fileName); - Path path = new Path(filePath); - LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path); - ObjectInputStream in = new ObjectInputStream(new BufferedInputStream( - new FileInputStream(path.toUri().getPath()), 4096)); - try{ - mapJoinTables[pos] = mapJoinTableSerdes[pos].load(in); - } finally { - in.close(); - } - } - } catch (Exception e) { - throw new HiveException(e); - } + loader.load(this.getExecContext(), hconf, this.getConf(), + posBigTable, mapJoinTables, mapJoinTableSerdes); cache.cache(tableKey, mapJoinTables); cache.cache(serdeKey, mapJoinTableSerdes); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java index fbe8d6c..b19e7a0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java @@ -26,6 +26,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.exec.tez.TezContext; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; @@ -45,7 +48,9 @@ public static MapredContext get() { } public static MapredContext init(boolean isMap, JobConf jobConf) { - MapredContext context = new MapredContext(isMap, jobConf); + MapredContext context = + HiveConf.getBoolVar(jobConf, ConfVars.HIVE_OPTIMIZE_TEZ) ? + new TezContext(isMap, jobConf) : new MapredContext(isMap, jobConf); contexts.set(context); return context; } @@ -64,7 +69,7 @@ public static void close() { private Reporter reporter; - private MapredContext(boolean isMap, JobConf jobConf) { + protected MapredContext(boolean isMap, JobConf jobConf) { this.isMap = isMap; this.jobConf = jobConf; this.udfs = new ArrayList(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java new file mode 100644 index 0000000..437026e --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.mr; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.ObjectInputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.shims.ShimLoader; + +/** + * HashTableLoader for MR loads the hashtable for MapJoins from local disk (hashtables + * are distributed by using the DistributedCache. + * + */ +public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTableLoader { + + private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName()); + + public HashTableLoader() { + } + + @Override + public void load(ExecMapperContext context, + Configuration hconf, + MapJoinDesc desc, + byte posBigTable, + MapJoinTableContainer[] mapJoinTables, + MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException { + + String baseDir = null; + String currentInputFile = context.getCurrentInputFile(); + LOG.info("******* Load from HashTable File: input : " + currentInputFile); + String fileName = context.getLocalWork().getBucketFileName(currentInputFile); + try { + if (ShimLoader.getHadoopShims().isLocalMode(hconf)) { + baseDir = context.getLocalWork().getTmpFileURI(); + } else { + Path[] localArchives; + String stageID = context.getLocalWork().getStageID(); + String suffix = Utilities.generateTarFileName(stageID); + FileSystem localFs = FileSystem.getLocal(hconf); + localArchives = DistributedCache.getLocalCacheArchives(hconf); + Path archive; + for (int j = 0; j < localArchives.length; j++) { + archive = localArchives[j]; + if (!archive.getName().endsWith(suffix)) { + continue; + } + Path archiveLocalLink = archive.makeQualified(localFs); + baseDir = archiveLocalLink.toUri().getPath(); + } + } + for (int pos = 0; pos < mapJoinTables.length; pos++) { + if (pos == posBigTable) { + continue; + } + if(baseDir == null) { + throw new IllegalStateException("baseDir cannot be null"); + } + String filePath = Utilities.generatePath(baseDir, desc.getDumpFilePrefix(), (byte)pos, fileName); + Path path = new Path(filePath); + LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path); + ObjectInputStream in = new ObjectInputStream(new BufferedInputStream( + new FileInputStream(path.toUri().getPath()), 4096)); + try{ + mapJoinTables[pos] = mapJoinTableSerdes[pos].load(in); + } finally { + in.close(); + } + } + } catch (Exception e) { + throw new HiveException(e); + } + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index cd5bdcf..55be709 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -68,10 +68,6 @@ import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.Vertex; -import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy; -import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput; -import org.apache.tez.runtime.library.output.OnFileSortedOutput; -import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; @@ -79,6 +75,10 @@ import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.partition.MRPartitioner; +import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy; +import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput; +import org.apache.tez.runtime.library.output.OnFileSortedOutput; +import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput; /** * DagUtils. DagUtils is a collection of helper methods to convert @@ -196,15 +196,6 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, Path tezDir = getTezDir(mrScratchDir); - // map work can contain localwork, i.e: hashtables for map-side joins - Path hashTableArchive = createHashTables(mapWork, conf); - LocalResource localWorkLr = null; - if (hashTableArchive != null) { - localWorkLr = createLocalResource(fs, - hashTableArchive, LocalResourceType.ARCHIVE, - LocalResourceVisibility.APPLICATION); - } - // write out the operator plan Path planPath = Utilities.setMapWork(conf, mapWork, mrScratchDir.toUri().toString(), false); @@ -242,16 +233,13 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, assert mapWork.getAliasToWork().keySet().size() == 1; String alias = mapWork.getAliasToWork().keySet().iterator().next(); - map.addInput(alias, + map.addInput(alias, new InputDescriptor(MRInput.class.getName()). setUserPayload(serializedConf)); map.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints()); Map localResources = new HashMap(); - if (localWorkLr != null) { - localResources.put(hashTableArchive.getName(), localWorkLr); - } localResources.put(getBaseName(appJarLr), appJarLr); for (LocalResource lr: additionalLr) { localResources.put(getBaseName(lr), lr); @@ -266,15 +254,6 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, } /* - * If the given MapWork has local work embedded we need to generate the corresponding - * hash tables and localize them. These tables will be used by the map work to do - * map-side joins. - */ - private static Path createHashTables(MapWork mapWork, Configuration conf) { - return null; - } - - /* * Helper function to create JobConf for specific ReduceWork. */ private static JobConf initializeVertexConf(JobConf baseConf, ReduceWork reduceWork) { @@ -610,7 +589,7 @@ public static Vertex createVertex(JobConf conf, BaseWork work, // final vertices need to have at least one output if (!hasChildren) { - v.addOutput("out_"+work.getName(), + v.addOutput("out_"+work.getName(), new OutputDescriptor(MROutput.class.getName()) .setUserPayload(MRHelpers.createUserPayloadFromConf(conf))); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java new file mode 100644 index 0000000..2ec5561 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; + +/** + * HashTableLoader for Tez constructs the hashtable from records read from + * a broadcast edge. + */ +public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTableLoader { + + private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName()); + + public HashTableLoader() { + } + + @SuppressWarnings("unused") + @Override + public void load(ExecMapperContext context, + Configuration hconf, + MapJoinDesc desc, + byte posBigTable, + MapJoinTableContainer[] mapJoinTables, + MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException { + TezContext tezContext = (TezContext) MapredContext.get(); + Map parentToInput = desc.getParentToInput(); + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index ca317c6..23400e4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -86,6 +86,7 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in l4j.info(mapOp.dump(0)); MapredContext.init(true, new JobConf(jconf)); + ((TezContext)MapredContext.get()).setInputs(inputs); mapOp.setExecContext(execContext); mapOp.initializeLocalWork(jconf); mapOp.initialize(jconf, null); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 62e38f4..eb39e5c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -128,6 +128,7 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in } MapredContext.init(false, new JobConf(jconf)); + ((TezContext)MapredContext.get()).setInputs(inputs); // initialize reduce operator tree try { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java new file mode 100644 index 0000000..40d0658 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.Map; + +import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.mapred.JobConf; +import org.apache.tez.runtime.api.LogicalInput; + +/** + * TezContext contains additional context only available with Tez + */ +public class TezContext extends MapredContext { + + // all the inputs for the tez processor + private Map inputs; + + public TezContext(boolean isMap, JobConf jobConf) { + super(isMap, jobConf); + } + + public void setInputs(Map inputs) { + this.inputs = inputs; + } + + public LogicalInput getInput(String name) { + if (inputs == null) { + return null; + } + return inputs.get(name); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index ae272e7..d119d70 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -82,17 +82,11 @@ public void run(Map inputs, Map out LOG.info("Running map: " + processorContext.getUniqueIdentifier()); - //this will change when TezProcessor has support for shuffle joins and broadcast joins - if (inputs.size() != 1){ - throw new IOException("Cannot handle multiple inputs " - + " inputCount=" + inputs.size()); - } - if(outputs.size() > 1) { throw new IOException("Cannot handle more than one output" + ", outputCount=" + outputs.size()); } - LogicalInput in = inputs.values().iterator().next(); + LogicalOutput out = outputs.values().iterator().next();