diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index ac76214..5d30ddc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1584,7 +1584,7 @@ public int close() { FetchTask fetchTask = plan.getFetchTask(); if (null != fetchTask) { try { - fetchTask.clearFetch(); + fetchTask.close(); } catch (Exception e) { LOG.debug(" Exception while clearing the Fetch task ", e); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java index d504b3c..f280493 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java @@ -77,7 +77,6 @@ public ColumnStatsTask() { @Override public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) { super.initialize(conf, queryPlan, ctx); - work.initializeForFetch(); try { JobConf job = new JobConf(conf); ftOp = new FetchOperator(work.getfWork(), job); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 0da886b..9f19418 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -34,10 +34,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; -import org.apache.hadoop.hive.ql.exec.FooterBuffer; import org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveRecordReader; @@ -48,7 +46,6 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; @@ -60,11 +57,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveTypeEntry; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; @@ -122,21 +116,21 @@ public FetchOperator() { } public FetchOperator(FetchWork work, JobConf job) { - this.job = job; - this.work = work; - initialize(); + this(work, job, null, null); + } + + public FetchOperator(FetchWork work, JobConf job, Operator operator) { + this(work, job, operator, operator.getExecContext()); } - public FetchOperator(FetchWork work, JobConf job, Operator operator, - List vcCols) { + public FetchOperator(FetchWork work, JobConf job, Operator operator, ExecMapperContext context) { this.job = job; this.work = work; - this.operator = operator; - this.vcCols = vcCols; - initialize(); + initialize(operator, context); } - private void initialize() { + private void initialize(Operator operator, ExecMapperContext context) { + this.vcCols = getVirtualColumns(operator); if (hasVC = vcCols != null && !vcCols.isEmpty()) { List names = new ArrayList(vcCols.size()); List inspectors = new ArrayList(vcCols.size()); @@ -162,16 +156,22 @@ private void initialize() { } else { isNativeTable = true; } - setupExecContext(); + if (context == null && (hasVC || work.getSplitSample() != null)) { + context = new ExecMapperContext(); + } + this.operator = operator; + this.context = context; + + if (operator != null && context != null) { + operator.setExecContext(context); + } } - private void setupExecContext() { - if (hasVC || work.getSplitSample() != null) { - context = new ExecMapperContext(); - if (operator != null) { - operator.setExecContext(context); - } + private List getVirtualColumns(Operator ts) { + if (ts instanceof TableScanOperator && ts.getConf() != null) { + return ((TableScanOperator)ts).getConf().getVirtualCols(); } + return null; } public FetchWork getWork() { @@ -525,27 +525,37 @@ public boolean doNext(WritableComparable key, Writable value) throws IOException /** * Get the next row and push down it to operator tree. - * Currently only used by FetchTask. + * Currently only used by FetchTask and ExecDriver (for partition sampling). **/ public boolean pushRow() throws IOException, HiveException { - if(work.getRowsComputedUsingStats() != null) { - for (List row : work.getRowsComputedUsingStats()) { - operator.processOp(row, 0); - } - operator.flush(); - return true; + if (work.getRowsComputedUsingStats() == null) { + return pushRow(getNextRow()); } - InspectableObject row = getNextRow(); + for (List row : work.getRowsComputedUsingStats()) { + operator.processOp(row, 0); + } + operator.flush(); + return true; + } + + protected boolean pushRow(InspectableObject row) throws HiveException { if (row != null) { - pushRow(row); - } else { - operator.flush(); + operator.processOp(row.o, 0); + if (!operator.getDone()) { + return true; + } } - return row != null; + operator.flush(); + return false; } - protected void pushRow(InspectableObject row) throws HiveException { - operator.processOp(row.o, 0); + public boolean pushRows() throws IOException, HiveException { + InspectableObject row; + while (!operator.getDone() && (row = getNextRow()) != null) { + operator.processOp(row.o, 0); + } + operator.flush(); + return false; } private transient final InspectableObject inspectable = new InspectableObject(); @@ -652,12 +662,10 @@ public void clearFetchContext() throws HiveException { currRecReader = null; } if (operator != null) { - operator.close(false); - operator = null; + operator.reset(); } if (context != null) { - context.clear(); - context = null; + context.reset(); } this.currTbl = null; this.currPath = null; @@ -669,17 +677,25 @@ public void clearFetchContext() throws HiveException { } } + public void close() throws HiveException { + clearFetchContext(); + if (operator != null) { + operator.close(false); + operator = null; + } + } + /** * used for bucket map join */ - public void setupContext(List paths) { + public void setupContext(String currentInput, List paths) { this.iterPath = paths.iterator(); if (work.isNotPartitioned()) { this.currTbl = work.getTblDesc(); } else { this.iterPartDesc = work.getPartDescs(paths).iterator(); } - setupExecContext(); + context.setCurrentBigBucketFile(currentInput); } /** @@ -767,7 +783,7 @@ public ObjectInspector getOutputObjectInspector() throws HiveException { // shrinked size for this split. counter part of this in normal mode is // InputSplitShim.shrinkedLength. // what's different is that this is evaluated by unit of row using RecordReader.getPos() - // and that is evaluated by unit of split using InputSplt.getLength(). + // and that is evaluated by unit of split using InputSplit.getLength(). private long shrinkedLength = -1; public FetchInputFormatSplit(InputSplit split, String name) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index c4f04cb..509ac67 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -76,7 +75,7 @@ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) { HiveInputFormat.pushFilters(job, ts); } sink = work.getSink(); - fetch = new FetchOperator(work, job, source, getVirtualColumns(source)); + fetch = new FetchOperator(work, job, source); source.initialize(conf, new ObjectInspector[]{fetch.getOutputObjectInspector()}); totalRows = 0; @@ -88,13 +87,6 @@ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) { } } - private List getVirtualColumns(Operator ts) { - if (ts instanceof TableScanOperator && ts.getConf() != null) { - return ((TableScanOperator)ts).getConf().getVirtualCols(); - } - return null; - } - @Override public int execute(DriverContext driverContext) { assert false; @@ -130,7 +122,7 @@ public boolean fetch(List res) throws IOException, CommandNeedRetryException { } try { if (rowsRet <= 0) { - fetch.clearFetchContext(); + fetch.close(); return false; } boolean fetched = false; @@ -170,7 +162,7 @@ public String getName() { } /** - * Clear the Fetch Operator. + * Clear the Fetch Operator and reset ExecContext if exists (reusable) * * @throws HiveException */ @@ -179,4 +171,10 @@ public void clearFetch() throws HiveException { fetch.clearFetchContext(); } } + + public void close() throws HiveException { + if (fetch != null) { + fetch.close(); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java index 166461a..fdd11e2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java @@ -137,7 +137,7 @@ public static FetchSampler createSampler(FetchWork work, HiveConf conf, JobConf private int sampled; public FetchSampler(FetchWork work, JobConf job, Operator operator) { - super(work, job, operator, null); + super(work, job, operator); } public void setSampleNum(int numSample) { @@ -149,23 +149,15 @@ public void setSamplePercent(float samplePercent) { } @Override - public boolean pushRow() throws IOException, HiveException { - if (!super.pushRow()) { - return false; - } - if (sampled < sampleNum) { + protected boolean pushRow(InspectableObject row) throws HiveException { + if (row != null && sampled < sampleNum) { + if (random.nextFloat() < samplePercent) { + sampled++; + return super.pushRow(row); + } return true; } - operator.flush(); return false; } - - @Override - protected void pushRow(InspectableObject row) throws HiveException { - if (random.nextFloat() < samplePercent) { - sampled++; - super.pushRow(row); - } - } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 487bb33..61f96bc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -201,7 +201,6 @@ public void initializeMapredLocalWork(MapJoinDesc mjConf, Configuration hconf, FetchOperator fetchOp = new FetchOperator(fetchWork, jobClone); ts.initialize(jobClone, new ObjectInspector[]{fetchOp.getOutputObjectInspector()}); - fetchOp.clearFetchContext(); DummyStoreOperator sinkOp = aliasToSinkWork.get(alias); @@ -526,16 +525,17 @@ private void setUpFetchContexts(String alias, MergeQueue mergeQueue) throws Hive Class bucketMatcherCls = bucketMatcherCxt.getBucketMatcherClass(); BucketMatcher bucketMatcher = ReflectionUtils.newInstance(bucketMatcherCls, null); - getExecContext().setFileId(bucketMatcherCxt.createFileId(currentInputPath.toString())); + String inputPathString = currentInputPath.toString(); + getExecContext().setFileId(bucketMatcherCxt.createFileId(inputPathString)); LOG.info("set task id: " + getExecContext().getFileId()); bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt .getAliasBucketFileNameMapping()); - List aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputPath.toString(), + List aliasFiles = bucketMatcher.getAliasBucketFiles(inputPathString, bucketMatcherCxt.getMapJoinBigTableAlias(), alias); - mergeQueue.setupContext(aliasFiles); + mergeQueue.setupContext(inputPathString, aliasFiles); } private void fetchOneRow(byte tag) { @@ -622,7 +622,7 @@ public void closeOp(boolean abort) throws HiveException { MergeQueue mergeQueue = entry.getValue(); Operator forwardOp = localWork.getAliasToWork().get(alias); forwardOp.close(abort); - mergeQueue.clearFetchContext(); + mergeQueue.close(); } } @@ -695,15 +695,15 @@ public MergeQueue(String alias, FetchWork fetchWork, JobConf jobConf, // currently, number of paths is always the same (bucket numbers are all the same over // all partitions in a table). // But if hive supports assigning bucket number for each partition, this can be vary - public void setupContext(List paths) throws HiveException { + public void setupContext(String inputPathString, List paths) throws HiveException { int segmentLen = paths.size(); FetchOperator[] segments = segmentsForSize(segmentLen); for (int i = 0 ; i < segmentLen; i++) { Path path = paths.get(i); if (segments[i] == null) { - segments[i] = new FetchOperator(fetchWork, new JobConf(jobConf)); + segments[i] = new FetchOperator(fetchWork, new JobConf(jobConf), forwardOp); } - segments[i].setupContext(Arrays.asList(path)); + segments[i].setupContext(inputPathString, Arrays.asList(path)); } initialize(segmentLen); for (int i = 0; i < segmentLen; i++) { @@ -815,6 +815,17 @@ private boolean next(Integer current) throws IOException, HiveException { keys[current] = null; return false; } + + public void close() throws HiveException { + if (segments != null) { + for (FetchOperator op : segments) { + if (op != null) { + op.close(); + } + } + } + segments = null; + } } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 1095173..fdf3b6c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -534,7 +534,7 @@ private void handleSampling(DriverContext context, MapWork mWork, JobConf job, H OperatorUtils.setChildrenCollector(ts.getChildOperators(), sampler); while (fetcher.pushRow()) { } } finally { - fetcher.clearFetchContext(); + fetcher.close(); } } else { throw new IllegalArgumentException("Invalid sampling type " + mWork.getSamplingType()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java index 74bc2d2..03b590c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java @@ -17,11 +17,8 @@ */ package org.apache.hadoop.hive.ql.exec.mr; -import java.util.Map; - import org.apache.commons.logging.Log; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.FetchOperator; import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.mapred.JobConf; @@ -45,7 +42,6 @@ // if big alias is partitioned table, it's partition spec + bucket number private String fileId = null; private MapredLocalWork localWork = null; - private Map fetchOperators; private JobConf jc; private IOContext ioCxt; @@ -64,13 +60,19 @@ public ExecMapperContext() { ioCxt = IOContext.get(); } + public void reset() { + lastInputPath = null; + currentInputPath = null; + inputFileChecked = false; + } + public void clear() { IOContext.clear(); ioCxt = null; } /** - * For CompbineFileInputFormat, the mapper's input file will be changed on the + * For CombineFileInputFormat, the mapper's input file will be changed on the * fly, and the input file name is passed to jobConf by shims/initNextRecordReader. * If the map local work has any mapping depending on the current * mapper's input file, the work need to clear context and re-initialization @@ -138,14 +140,6 @@ public void setFileId(String fileId) { this.fileId = fileId; } - public Map getFetchOperators() { - return fetchOperators; - } - - public void setFetchOperators(Map fetchOperators) { - this.fetchOperators = fetchOperators; - } - public IOContext getIoCxt() { return ioCxt; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java index 1ba1518..d938fd6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java @@ -35,8 +35,6 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TemporaryHashSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -143,8 +141,8 @@ private void loadDirectly(MapJoinTableContainer[] mapJoinTables, String inputFil operator.setChildOperators(Arrays.>asList(sink)); } } - localTask.setExecContext(context); - localTask.startForward(inputFileName); + + localTask.startForward(context, inputFileName); MapJoinTableContainer[] tables = sink.getMapJoinTables(); for (int i = 0; i < sink.getNumParent(); i++) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 4adfc6c..3be5d2a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -61,7 +61,6 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; @@ -79,7 +78,6 @@ */ public class MapredLocalTask extends Task implements Serializable { - private final Map fetchOperators = new HashMap(); protected HadoopJobExecHelper jobExecHelper; private JobConf job; public static transient final Log l4j = LogFactory.getLog(MapredLocalTask.class); @@ -89,10 +87,6 @@ public static MemoryMXBean memoryMXBean; private static final Log LOG = LogFactory.getLog(MapredLocalTask.class); - // not sure we need this exec context; but all the operators in the work - // will pass this context throught - private ExecMapperContext execContext = new ExecMapperContext(); - private Process executor; public MapredLocalTask() { @@ -105,10 +99,6 @@ public MapredLocalTask(MapredLocalWork plan, JobConf job, boolean isSilent) thro console = new LogHelper(LOG, isSilent); } - public void setExecContext(ExecMapperContext execContext) { - this.execContext = execContext; - } - @Override public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) { super.initialize(conf, queryPlan, driverContext); @@ -306,11 +296,10 @@ public int executeInProcess(DriverContext driverContext) { console.printInfo(Utilities.now() + "\tStarting to launch local task to process map join;\tmaximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); - execContext.setJc(job); - // set the local work, so all the operator can get this context - execContext.setLocalWork(work); + + ExecMapperContext context = new ExecMapperContext(); try { - startForward(null); + startForward(context, null); long currentTime = System.currentTimeMillis(); long elapsed = currentTime - startTime; console.printInfo(Utilities.now() + "\tEnd of local task; Time Taken: " @@ -320,128 +309,113 @@ public int executeInProcess(DriverContext driverContext) { || (throwable instanceof MapJoinMemoryExhaustionException)) { l4j.error("Hive Runtime Error: Map local work exhausted memory", throwable); return 3; - } else { - l4j.error("Hive Runtime Error: Map local work failed", throwable); - return 2; } + console.printError(org.apache.hadoop.util.StringUtils.stringifyException(throwable)); + l4j.error("Hive Runtime Error: Map local work failed", throwable); + return 2; + } finally { + context.clear(); } return 0; } - public void startForward(String bigTableBucket) throws Exception { - boolean inputFileChangeSenstive = work.getInputFileChangeSensitive(); - initializeOperators(new HashMap()); + public void startForward(ExecMapperContext execContext, String bigTableBucket) + throws Exception { + + execContext.setJc(job); + // set the local work, so all the operator can get this context + execContext.setLocalWork(work); + + Map fetchers = initializeOperators(execContext); + // for each big table's bucket, call the start forward - if (inputFileChangeSenstive) { + if (work.getInputFileChangeSensitive()) { for (Map> bigTableBucketFiles : work .getBucketMapjoinContext().getAliasBucketFileNameMapping().values()) { if (bigTableBucket == null) { for (String bigTableBucketFile : bigTableBucketFiles.keySet()) { - startForward(inputFileChangeSenstive, bigTableBucketFile); + startForward(fetchers, bigTableBucketFile); } } else if (bigTableBucketFiles.keySet().contains(bigTableBucket)) { - startForward(inputFileChangeSenstive, bigTableBucket); + startForward(fetchers, bigTableBucket); } } } else { - startForward(inputFileChangeSenstive, null); + startForward(fetchers, null); } } - private void startForward(boolean inputFileChangeSenstive, String bigTableBucket) + private void startForward(Map fetchers, String bigTableBucket) throws Exception { for (Operator source : work.getAliasToWork().values()) { source.reset(); } - if (inputFileChangeSenstive) { - execContext.setCurrentBigBucketFile(bigTableBucket); - } - for (Map.Entry entry : fetchOperators.entrySet()) { + for (Map.Entry entry : fetchers.entrySet()) { String alias = entry.getKey(); FetchOperator fetchOp = entry.getValue(); - if (inputFileChangeSenstive) { - fetchOp.clearFetchContext(); + if (bigTableBucket != null) { setUpFetchOpContext(fetchOp, alias, bigTableBucket); } - - // get the root operator - Operator forwardOp = work.getAliasToWork().get(alias); - // walk through the operator tree - while (!forwardOp.getDone()) { - InspectableObject row = fetchOp.getNextRow(); - if (row == null) { - break; - } - forwardOp.processOp(row.o, 0); + try { + fetchOp.pushRows(); + } finally { + fetchOp.clearFetchContext(); } - forwardOp.flush(); } for (Operator source : work.getAliasToWork().values()) { - source.close(false); + source.close(false); // flush to file } } - private void initializeOperators(Map fetchOpJobConfMap) + private Map initializeOperators(ExecMapperContext context) throws HiveException { - for (Map.Entry> entry : work.getAliasToWork().entrySet()) { - LOG.debug("initializeOperators: " + entry.getKey() + ", children = " + entry.getValue().getChildOperators()); - } - // this mapper operator is used to initialize all the operators + + Map fetchOperators = new HashMap(); + + // all operators should be initialized first before fetching for (Map.Entry entry : work.getAliasToFetchWork().entrySet()) { if (entry.getValue() == null) { continue; } JobConf jobClone = new JobConf(job); - TableScanOperator ts = (TableScanOperator)work.getAliasToWork().get(entry.getKey()); + Operator forwardOp = work.getAliasToWork().get(entry.getKey()); // push down projections - ColumnProjectionUtils.appendReadColumns( - jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns()); - // push down filters - HiveInputFormat.pushFilters(jobClone, ts); - - // create a fetch operator - FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone); - fetchOpJobConfMap.put(fetchOp, jobClone); - fetchOperators.put(entry.getKey(), fetchOp); - l4j.info("fetchoperator for " + entry.getKey() + " created"); - } - // initialize all forward operator - for (Map.Entry entry : fetchOperators.entrySet()) { - // get the forward op - String alias = entry.getKey(); - Operator forwardOp = work.getAliasToWork().get(alias); + if (forwardOp instanceof TableScanOperator) { + TableScanOperator ts = (TableScanOperator) forwardOp; + ColumnProjectionUtils.appendReadColumns( + jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns()); + // push down filters + HiveInputFormat.pushFilters(jobClone, ts); + } + forwardOp.setExecContext(context); - // put the exe context into all the operators - forwardOp.setExecContext(execContext); - // All the operators need to be initialized before process - FetchOperator fetchOp = entry.getValue(); - JobConf jobConf = fetchOpJobConfMap.get(fetchOp); + // create a fetch operator & put the exe context into all the operators + FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone, forwardOp); - if (jobConf == null) { - jobConf = job; - } - // initialize the forward operator + // All the operators need to be initialized before process ObjectInspector objectInspector = fetchOp.getOutputObjectInspector(); - forwardOp.initialize(jobConf, new ObjectInspector[] {objectInspector}); + forwardOp.initialize(jobClone, new ObjectInspector[]{objectInspector}); + + fetchOperators.put(entry.getKey(), fetchOp); l4j.info("fetchoperator for " + entry.getKey() + " initialized"); } + return fetchOperators; } private void setUpFetchOpContext(FetchOperator fetchOp, String alias, String currentInputFile) throws Exception { - BucketMapJoinContext bucketMatcherCxt = this.work.getBucketMapjoinContext(); + BucketMapJoinContext bucketMatcherCxt = work.getBucketMapjoinContext(); Class bucketMatcherCls = bucketMatcherCxt.getBucketMatcherClass(); - BucketMatcher bucketMatcher = ReflectionUtils.newInstance(bucketMatcherCls, - null); + BucketMatcher bucketMatcher = ReflectionUtils.newInstance(bucketMatcherCls, null); bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt.getAliasBucketFileNameMapping()); List aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile, bucketMatcherCxt .getMapJoinBigTableAlias(), alias); - fetchOp.setupContext(aliasFiles); + fetchOp.setupContext(currentInputFile, aliasFiles); } @Override @@ -461,7 +435,6 @@ public String getName() { @Override public StageType getType() { - //assert false; return StageType.MAPREDLOCAL; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java index 3cae727..6805799 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java @@ -20,8 +20,6 @@ import java.io.Serializable; -import org.apache.hadoop.hive.ql.exec.ListSinkOperator; - /** * ColumnStats Work. * @@ -66,14 +64,6 @@ public void setColStats(ColumnStatsDesc colStats) { this.colStats = colStats; } - public ListSinkOperator getSink() { - return fWork.getSink(); - } - - public void initializeForFetch() { - fWork.initializeForFetch(); - } - public int getLeastNumRows() { return fWork.getLeastNumRows(); } diff --git ql/src/test/queries/clientpositive/join_vc.q ql/src/test/queries/clientpositive/join_vc.q index 63b3da7..68149de 100644 --- ql/src/test/queries/clientpositive/join_vc.q +++ ql/src/test/queries/clientpositive/join_vc.q @@ -3,3 +3,11 @@ explain select t3.BLOCK__OFFSET__INSIDE__FILE,t3.key,t3.value from src t1 join src t2 on t1.key = t2.key join src t3 on t2.value = t3.value order by t3.BLOCK__OFFSET__INSIDE__FILE,t3.key,t3.value limit 3; select t3.BLOCK__OFFSET__INSIDE__FILE,t3.key,t3.value from src t1 join src t2 on t1.key = t2.key join src t3 on t2.value = t3.value order by t3.BLOCK__OFFSET__INSIDE__FILE,t3.key,t3.value limit 3; + +set hive.auto.convert.join=true; + +-- HIVE-4790 MapredLocalTask does not make virtual columns + +explain +SELECT *,a.BLOCK__OFFSET__INSIDE__FILE FROM src a JOIN src b ON a.key = b.key where a.key < 50 order by a.key, a.BLOCK__OFFSET__INSIDE__FILE; +SELECT *,a.BLOCK__OFFSET__INSIDE__FILE FROM src a JOIN src b ON a.key = b.key where a.key < 50 order by a.key, a.BLOCK__OFFSET__INSIDE__FILE; diff --git ql/src/test/results/clientpositive/join_vc.q.out ql/src/test/results/clientpositive/join_vc.q.out index 1a47f66..bf7ceda 100644 --- ql/src/test/results/clientpositive/join_vc.q.out +++ ql/src/test/results/clientpositive/join_vc.q.out @@ -137,3 +137,172 @@ POSTHOOK: Input: default@src 0 238 val_238 0 238 val_238 0 238 val_238 +PREHOOK: query: -- HIVE-4790 MapredLocalTask does not make virtual columns + +explain +SELECT *,a.BLOCK__OFFSET__INSIDE__FILE FROM src a JOIN src b ON a.key = b.key where a.key < 50 order by a.key, a.BLOCK__OFFSET__INSIDE__FILE +PREHOOK: type: QUERY +POSTHOOK: query: -- HIVE-4790 MapredLocalTask does not make virtual columns + +explain +SELECT *,a.BLOCK__OFFSET__INSIDE__FILE FROM src a JOIN src b ON a.key = b.key where a.key < 50 order by a.key, a.BLOCK__OFFSET__INSIDE__FILE +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-5 is a root stage + Stage-2 depends on stages: Stage-5 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-5 + Map Reduce Local Work + Alias -> Map Local Tables: + a + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + a + TableScan + alias: a + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and (key < 50)) (type: boolean) + Statistics: Num rows: 5 Data size: 1002 Basic stats: COMPLETE Column stats: NONE + HashTable Sink Operator + condition expressions: + 0 {value} {BLOCK__OFFSET__INSIDE__FILE} + 1 {key} {value} + keys: + 0 key (type: string) + 1 key (type: string) + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + alias: b + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and (key < 50)) (type: boolean) + Statistics: Num rows: 5 Data size: 1002 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} {value} {BLOCK__OFFSET__INSIDE__FILE} + 1 {key} {value} + keys: + 0 key (type: string) + 1 key (type: string) + outputColumnNames: _col0, _col1, _col2, _col4, _col5 + Statistics: Num rows: 5 Data size: 1102 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string), _col4 (type: string), _col5 (type: string), _col2 (type: bigint) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 5 Data size: 1102 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col4 (type: bigint) + sort order: ++ + Statistics: Num rows: 5 Data size: 1102 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string), _col2 (type: string), _col3 (type: string) + Local Work: + Map Reduce Local Work + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string), VALUE._col1 (type: string), VALUE._col2 (type: string), KEY.reducesinkkey1 (type: bigint) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 5 Data size: 1102 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 5 Data size: 1102 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT *,a.BLOCK__OFFSET__INSIDE__FILE FROM src a JOIN src b ON a.key = b.key where a.key < 50 order by a.key, a.BLOCK__OFFSET__INSIDE__FILE +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: SELECT *,a.BLOCK__OFFSET__INSIDE__FILE FROM src a JOIN src b ON a.key = b.key where a.key < 50 order by a.key, a.BLOCK__OFFSET__INSIDE__FILE +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +0 val_0 0 val_0 968 +0 val_0 0 val_0 968 +0 val_0 0 val_0 968 +0 val_0 0 val_0 2088 +0 val_0 0 val_0 2088 +0 val_0 0 val_0 2088 +0 val_0 0 val_0 2632 +0 val_0 0 val_0 2632 +0 val_0 0 val_0 2632 +10 val_10 10 val_10 2846 +11 val_11 11 val_11 3170 +12 val_12 12 val_12 1720 +12 val_12 12 val_12 1720 +12 val_12 12 val_12 4362 +12 val_12 12 val_12 4362 +15 val_15 15 val_15 386 +15 val_15 15 val_15 386 +15 val_15 15 val_15 2770 +15 val_15 15 val_15 2770 +17 val_17 17 val_17 910 +18 val_18 18 val_18 5340 +18 val_18 18 val_18 5340 +18 val_18 18 val_18 5514 +18 val_18 18 val_18 5514 +19 val_19 19 val_19 2824 +2 val_2 2 val_2 4004 +20 val_20 20 val_20 1118 +24 val_24 24 val_24 1972 +24 val_24 24 val_24 1972 +24 val_24 24 val_24 4594 +24 val_24 24 val_24 4594 +26 val_26 26 val_26 2226 +26 val_26 26 val_26 2226 +26 val_26 26 val_26 5284 +26 val_26 26 val_26 5284 +27 val_27 27 val_27 34 +28 val_28 28 val_28 5616 +30 val_30 30 val_30 3494 +33 val_33 33 val_33 3592 +34 val_34 34 val_34 3192 +35 val_35 35 val_35 1238 +35 val_35 35 val_35 1238 +35 val_35 35 val_35 1238 +35 val_35 35 val_35 3138 +35 val_35 35 val_35 3138 +35 val_35 35 val_35 3138 +35 val_35 35 val_35 4012 +35 val_35 35 val_35 4012 +35 val_35 35 val_35 4012 +37 val_37 37 val_37 328 +37 val_37 37 val_37 328 +37 val_37 37 val_37 5626 +37 val_37 37 val_37 5626 +4 val_4 4 val_4 1218 +41 val_41 41 val_41 3388 +42 val_42 42 val_42 2030 +42 val_42 42 val_42 2030 +42 val_42 42 val_42 3298 +42 val_42 42 val_42 3298 +43 val_43 43 val_43 2330 +44 val_44 44 val_44 4068 +47 val_47 47 val_47 1198 +5 val_5 5 val_5 3060 +5 val_5 5 val_5 3060 +5 val_5 5 val_5 3060 +5 val_5 5 val_5 3864 +5 val_5 5 val_5 3864 +5 val_5 5 val_5 3864 +5 val_5 5 val_5 4540 +5 val_5 5 val_5 4540 +5 val_5 5 val_5 4540 +8 val_8 8 val_8 1916 +9 val_9 9 val_9 5398