diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 8bb6d0f..0192c65 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1725,7 +1725,7 @@ public int close() { FetchTask fetchTask = plan.getFetchTask(); if (null != fetchTask) { try { - fetchTask.clearFetch(); + fetchTask.close(); } catch (Exception e) { LOG.debug(" Exception while clearing the Fetch task ", e); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java index e2f696e..9ccd7f6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java @@ -79,7 +79,6 @@ public ColumnStatsTask() { @Override public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) { super.initialize(conf, queryPlan, ctx); - work.initializeForFetch(); try { JobConf job = new JobConf(conf); ftOp = new FetchOperator(work.getfWork(), job); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 0ccab02..3a2cdaf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -127,16 +127,17 @@ public FetchOperator(FetchWork work, JobConf job) throws HiveException { this(work, job, null, null); } - public FetchOperator(FetchWork work, JobConf job, Operator operator, - List vcCols) throws HiveException { + public FetchOperator(FetchWork work, JobConf job, Operator operator, + ExecMapperContext context) throws HiveException { this.job = job; this.work = work; this.operator = operator; - this.vcCols = vcCols; + this.vcCols = getVirtualColumns(operator); this.hasVC = vcCols != null && !vcCols.isEmpty(); this.isStatReader = work.getTblDesc() == null; this.isPartitioned = !isStatReader && work.isPartitioned(); this.isNonNativeTable = !isStatReader && work.getTblDesc().isNonNative(); + this.context = setupExecContext(context, operator, work.getPathLists()); initialize(); } @@ -170,21 +171,27 @@ private void initialize() throws HiveException { iterPartDesc = Iterators.cycle(new PartitionDesc(work.getTblDesc(), null)); } outputOI = setupOutputObjectInspector(); - context = setupExecContext(operator, work.getPathLists()); } - private ExecMapperContext setupExecContext(Operator operator, List paths) { - ExecMapperContext context = null; - if (hasVC || work.getSplitSample() != null) { + private ExecMapperContext setupExecContext(ExecMapperContext context, + Operator operator, List paths) { + if (context == null && (hasVC || work.getSplitSample() != null)) { context = new ExecMapperContext(job); - if (operator != null) { - operator.setExecContext(context); - } + } + if (operator != null && context != null) { + operator.setExecContext(context); } setFetchOperatorContext(job, paths); return context; } + private List getVirtualColumns(Operator ts) { + if (ts instanceof TableScanOperator && ts.getConf() != null) { + return ((TableScanOperator)ts).getConf().getVirtualCols(); + } + return null; + } + public FetchWork getWork() { return work; } @@ -401,30 +408,40 @@ public boolean doNext(WritableComparable key, Writable value) throws IOException /** * Get the next row and push down it to operator tree. - * Currently only used by FetchTask. + * Currently only used by FetchTask and ExecDriver (for partition sampling). **/ public boolean pushRow() throws IOException, HiveException { - if (work.getRowsComputedUsingStats() != null) { - for (List row : work.getRowsComputedUsingStats()) { - operator.processOp(row, 0); - } - flushRow(); - return true; + if (work.getRowsComputedUsingStats() == null) { + return pushRow(getNextRow()); } - InspectableObject row = getNextRow(); + for (List row : work.getRowsComputedUsingStats()) { + operator.processOp(row, 0); + } + flushRows(); + return true; + } + + protected boolean pushRow(InspectableObject row) throws HiveException { if (row != null) { - pushRow(row); - } else { - flushRow(); + operator.processOp(row.o, 0); + if (!operator.getDone()) { + return true; + } } - return row != null; + operator.flush(); + return false; } - protected void pushRow(InspectableObject row) throws HiveException { - operator.processOp(row.o, 0); + // push all + public void pushRows() throws IOException, HiveException { + InspectableObject row; + while (!operator.getDone() && (row = getNextRow()) != null) { + operator.processOp(row.o, 0); + } + flushRows(); } - protected void flushRow() throws HiveException { + protected void flushRows() throws HiveException { operator.flush(); } @@ -519,12 +536,10 @@ public void clearFetchContext() throws HiveException { currRecReader = null; } if (operator != null) { - operator.close(false); - operator = null; + operator.reset(); } if (context != null) { - context.clear(); - context = null; + context.reset(); } this.currPath = null; this.iterPath = null; @@ -536,10 +551,18 @@ public void clearFetchContext() throws HiveException { } } + public void close() throws HiveException { + clearFetchContext(); + if (operator != null) { + operator.close(false); + operator = null; + } + } + /** * used for bucket map join */ - public void setupContext(List paths) { + public void setupContext(String inputBucketPath, List paths) { this.iterPath = paths.iterator(); List partitionDescs; if (!isPartitioned) { @@ -547,7 +570,8 @@ public void setupContext(List paths) { } else { this.iterPartDesc = work.getPartDescs(paths).iterator(); } - this.context = setupExecContext(operator, paths); + context = setupExecContext(context, operator, paths); + context.setCurrentBigBucketFile(inputBucketPath); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index c4f04cb..3e3ea0e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -76,7 +75,7 @@ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) { HiveInputFormat.pushFilters(job, ts); } sink = work.getSink(); - fetch = new FetchOperator(work, job, source, getVirtualColumns(source)); + fetch = new FetchOperator(work, job, source, null); source.initialize(conf, new ObjectInspector[]{fetch.getOutputObjectInspector()}); totalRows = 0; @@ -88,13 +87,6 @@ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) { } } - private List getVirtualColumns(Operator ts) { - if (ts instanceof TableScanOperator && ts.getConf() != null) { - return ((TableScanOperator)ts).getConf().getVirtualCols(); - } - return null; - } - @Override public int execute(DriverContext driverContext) { assert false; @@ -130,7 +122,7 @@ public boolean fetch(List res) throws IOException, CommandNeedRetryException { } try { if (rowsRet <= 0) { - fetch.clearFetchContext(); + fetch.close(); return false; } boolean fetched = false; @@ -170,7 +162,7 @@ public String getName() { } /** - * Clear the Fetch Operator. + * Clear the Fetch Operator and reset ExecContext if exists (reusable) * * @throws HiveException */ @@ -179,4 +171,10 @@ public void clearFetch() throws HiveException { fetch.clearFetchContext(); } } + + public void close() throws HiveException { + if (fetch != null) { + fetch.close(); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java index 96f4530..550194d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java @@ -168,23 +168,16 @@ public void setSamplePercent(float samplePercent) { } @Override - public boolean pushRow() throws IOException, HiveException { - if (!super.pushRow()) { - return false; - } - if (sampled < sampleNum) { + protected boolean pushRow(InspectableObject row) throws HiveException { + if (row != null && sampled < sampleNum) { + if (random.nextFloat() < samplePercent) { + sampled++; + return super.pushRow(row); + } return true; } - flushRow(); + flushRows(); return false; } - - @Override - protected void pushRow(InspectableObject row) throws HiveException { - if (random.nextFloat() < samplePercent) { - sampled++; - super.pushRow(row); - } - } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 2c9e81f..33f4fd5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -196,12 +197,8 @@ public void initializeMapredLocalWork(MapJoinDesc mjConf, Configuration hconf, // push down filters HiveInputFormat.pushFilters(jobClone, ts); - - ts.setExecContext(getExecContext()); - - FetchOperator fetchOp = new FetchOperator(fetchWork, jobClone); + FetchOperator fetchOp = new FetchOperator(fetchWork, jobClone, ts, getExecContext()); ts.initialize(jobClone, new ObjectInspector[]{fetchOp.getOutputObjectInspector()}); - fetchOp.clearFetchContext(); DummyStoreOperator sinkOp = aliasToSinkWork.get(alias); @@ -526,16 +523,17 @@ private void setUpFetchContexts(String alias, MergeQueue mergeQueue) throws Hive Class bucketMatcherCls = bucketMatcherCxt.getBucketMatcherClass(); BucketMatcher bucketMatcher = ReflectionUtils.newInstance(bucketMatcherCls, null); - getExecContext().setFileId(bucketMatcherCxt.createFileId(currentInputPath.toString())); + String inputPathString = currentInputPath.toString(); + getExecContext().setFileId(bucketMatcherCxt.createFileId(inputPathString)); LOG.info("set task id: " + getExecContext().getFileId()); bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt .getAliasBucketFileNameMapping()); - List aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputPath.toString(), + List aliasFiles = bucketMatcher.getAliasBucketFiles(inputPathString, bucketMatcherCxt.getMapJoinBigTableAlias(), alias); - mergeQueue.setupContext(aliasFiles); + mergeQueue.setupContext(inputPathString, aliasFiles); } private void fetchOneRow(byte tag) { @@ -622,7 +620,7 @@ public void closeOp(boolean abort) throws HiveException { MergeQueue mergeQueue = entry.getValue(); Operator forwardOp = localWork.getAliasToWork().get(alias); forwardOp.close(abort); - mergeQueue.clearFetchContext(); + mergeQueue.close(); } } @@ -695,16 +693,17 @@ public MergeQueue(String alias, FetchWork fetchWork, JobConf jobConf, // currently, number of paths is always the same (bucket numbers are all the same over // all partitions in a table). // But if hive supports assigning bucket number for each partition, this can be vary - public void setupContext(List paths) throws HiveException { + public void setupContext(String inputPath, List paths) throws HiveException { int segmentLen = paths.size(); FetchOperator.setFetchOperatorContext(jobConf, fetchWork.getPartDir()); FetchOperator[] segments = segmentsForSize(segmentLen); for (int i = 0 ; i < segmentLen; i++) { Path path = paths.get(i); if (segments[i] == null) { - segments[i] = new FetchOperator(fetchWork, new JobConf(jobConf)); + ExecMapperContext context = getExecContext(); + segments[i] = new FetchOperator(fetchWork, new JobConf(jobConf), forwardOp, context); } - segments[i].setupContext(Arrays.asList(path)); + segments[i].setupContext(inputPath, Arrays.asList(path)); } initialize(segmentLen); for (int i = 0; i < segmentLen; i++) { @@ -745,7 +744,7 @@ protected boolean lessThan(Object a, Object b) { return compareKeys(keys[(Integer) a].getFirst(), keys[(Integer)b].getFirst()) < 0; } - public final InspectableObject getNextRow() throws IOException { + public final InspectableObject getNextRow() throws HiveException, IOException { if (currentMinSegment != null) { adjustPriorityQueue(currentMinSegment); } @@ -758,8 +757,8 @@ public final InspectableObject getNextRow() throws IOException { return keys[currentMinSegment = current].getSecond(); } - private void adjustPriorityQueue(Integer current) throws IOException { - if (nextIO(current)) { + private void adjustPriorityQueue(Integer current) throws HiveException, IOException { + if (next(current)) { adjustTop(); // sort } else { pop(); @@ -775,15 +774,6 @@ private boolean nextHive(Integer current) throws HiveException { } } - // wrapping for exception handling - private boolean nextIO(Integer current) throws IOException { - try { - return next(current); - } catch (HiveException e) { - throw new IOException(e); - } - } - // return true if current min segment(FetchOperator) has next row private boolean next(Integer current) throws IOException, HiveException { if (keyFields == null) { @@ -816,6 +806,17 @@ private boolean next(Integer current) throws IOException, HiveException { keys[current] = null; return false; } + + public void close() throws HiveException { + if (segments != null) { + for (FetchOperator op : segments) { + if (op != null) { + op.close(); + } + } + } + segments = null; + } } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 2227e6f..5765de4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -542,7 +542,7 @@ private void handleSampling(DriverContext context, MapWork mWork, JobConf job, H OperatorUtils.setChildrenCollector(ts.getChildOperators(), sampler); while (fetcher.pushRow()) { } } finally { - fetcher.clearFetchContext(); + fetcher.close(); } } else { throw new IllegalArgumentException("Invalid sampling type " + mWork.getSamplingType()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java index 8b92f32..232e9a9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java @@ -17,11 +17,8 @@ */ package org.apache.hadoop.hive.ql.exec.mr; -import java.util.Map; - import org.apache.commons.logging.Log; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.FetchOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; @@ -46,12 +43,11 @@ // if big alias is partitioned table, it's partition spec + bucket number private String fileId = null; private MapredLocalWork localWork = null; - private Map fetchOperators; private JobConf jc; private IOContext ioCxt; - private String currentBigBucketFile=null; + private String currentBigBucketFile; public String getCurrentBigBucketFile() { return currentBigBucketFile; @@ -66,13 +62,19 @@ public ExecMapperContext(JobConf jc) { ioCxt = IOContext.get(jc.get(Utilities.INPUT_NAME)); } + public void reset() { + lastInputPath = null; + currentInputPath = null; + inputFileChecked = false; + } + public void clear() { IOContext.clear(); ioCxt = null; } /** - * For CompbineFileInputFormat, the mapper's input file will be changed on the + * For CombineFileInputFormat, the mapper's input file will be changed on the * fly, and the input file name is passed to jobConf by shims/initNextRecordReader. * If the map local work has any mapping depending on the current * mapper's input file, the work need to clear context and re-initialization @@ -140,14 +142,6 @@ public void setFileId(String fileId) { this.fileId = fileId; } - public Map getFetchOperators() { - return fetchOperators; - } - - public void setFetchOperators(Map fetchOperators) { - this.fetchOperators = fetchOperators; - } - public IOContext getIoCxt() { return ioCxt; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java index 9581b72..d8603eb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java @@ -35,8 +35,6 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TemporaryHashSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -143,8 +141,8 @@ private void loadDirectly(MapJoinTableContainer[] mapJoinTables, String inputFil operator.setChildOperators(Arrays.>asList(sink)); } } - localTask.setExecContext(context); - localTask.startForward(inputFileName); + + localTask.startForward(context, inputFileName); MapJoinTableContainer[] tables = sink.getMapJoinTables(); for (int i = 0; i < sink.getNumParent(); i++) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 9f3df99..c89eeef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -62,10 +62,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.shims.HadoopShims; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; @@ -82,7 +79,6 @@ */ public class MapredLocalTask extends Task implements Serializable { - private final Map fetchOperators = new HashMap(); protected HadoopJobExecHelper jobExecHelper; private JobConf job; public static transient final Log l4j = LogFactory.getLog(MapredLocalTask.class); @@ -92,10 +88,6 @@ public static MemoryMXBean memoryMXBean; private static final Log LOG = LogFactory.getLog(MapredLocalTask.class); - // not sure we need this exec context; but all the operators in the work - // will pass this context throught - private ExecMapperContext execContext = null; - private Process executor; public MapredLocalTask() { @@ -108,15 +100,10 @@ public MapredLocalTask(MapredLocalWork plan, JobConf job, boolean isSilent) thro console = new LogHelper(LOG, isSilent); } - public void setExecContext(ExecMapperContext execContext) { - this.execContext = execContext; - } - @Override public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) { super.initialize(conf, queryPlan, driverContext); job = new JobConf(conf, ExecDriver.class); - execContext = new ExecMapperContext(job); //we don't use the HadoopJobExecHooks for local tasks this.jobExecHelper = new HadoopJobExecHelper(job, console, this, null); } @@ -326,20 +313,15 @@ public int executeInProcess(DriverContext driverContext) { return -1; } - if (execContext == null) { - execContext = new ExecMapperContext(job); - } - memoryMXBean = ManagementFactory.getMemoryMXBean(); long startTime = System.currentTimeMillis(); console.printInfo(Utilities.now() + "\tStarting to launch local task to process map join;\tmaximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); - execContext.setJc(job); - // set the local work, so all the operator can get this context - execContext.setLocalWork(work); + + ExecMapperContext context = new ExecMapperContext(job); try { - startForward(null); + startForward(context, null); long currentTime = System.currentTimeMillis(); long elapsed = currentTime - startTime; console.printInfo(Utilities.now() + "\tEnd of local task; Time Taken: " @@ -349,128 +331,111 @@ public int executeInProcess(DriverContext driverContext) { || (throwable instanceof MapJoinMemoryExhaustionException)) { l4j.error("Hive Runtime Error: Map local work exhausted memory", throwable); return 3; - } else { - l4j.error("Hive Runtime Error: Map local work failed", throwable); - return 2; } + console.printError(org.apache.hadoop.util.StringUtils.stringifyException(throwable)); + l4j.error("Hive Runtime Error: Map local work failed", throwable); + return 2; + } finally { + context.clear(); } return 0; } - public void startForward(String bigTableBucket) throws Exception { - boolean inputFileChangeSenstive = work.getInputFileChangeSensitive(); - initializeOperators(new HashMap()); + public void startForward(ExecMapperContext execContext, String bigTableBucket) + throws Exception { + + execContext.setJc(job); + // set the local work, so all the operator can get this context + execContext.setLocalWork(work); + + Map fetchers = initializeOperators(execContext); + // for each big table's bucket, call the start forward - if (inputFileChangeSenstive) { + if (work.getInputFileChangeSensitive()) { for (Map> bigTableBucketFiles : work .getBucketMapjoinContext().getAliasBucketFileNameMapping().values()) { if (bigTableBucket == null) { for (String bigTableBucketFile : bigTableBucketFiles.keySet()) { - startForward(inputFileChangeSenstive, bigTableBucketFile); + startForward(fetchers, bigTableBucketFile); } } else if (bigTableBucketFiles.keySet().contains(bigTableBucket)) { - startForward(inputFileChangeSenstive, bigTableBucket); + startForward(fetchers, bigTableBucket); } } } else { - startForward(inputFileChangeSenstive, null); + startForward(fetchers, null); } } - private void startForward(boolean inputFileChangeSenstive, String bigTableBucket) + private void startForward(Map fetchers, String bigTableBucket) throws Exception { for (Operator source : work.getAliasToWork().values()) { source.reset(); } - if (inputFileChangeSenstive) { - execContext.setCurrentBigBucketFile(bigTableBucket); - } - for (Map.Entry entry : fetchOperators.entrySet()) { + for (Map.Entry entry : fetchers.entrySet()) { String alias = entry.getKey(); FetchOperator fetchOp = entry.getValue(); - if (inputFileChangeSenstive) { - fetchOp.clearFetchContext(); + if (bigTableBucket != null) { setUpFetchOpContext(fetchOp, alias, bigTableBucket); } - - // get the root operator - Operator forwardOp = work.getAliasToWork().get(alias); - // walk through the operator tree - while (!forwardOp.getDone()) { - InspectableObject row = fetchOp.getNextRow(); - if (row == null) { - break; - } - forwardOp.processOp(row.o, 0); + try { + fetchOp.pushRows(); + } finally { + fetchOp.clearFetchContext(); } - forwardOp.flush(); } for (Operator source : work.getAliasToWork().values()) { - source.close(false); + source.close(false); // flush to file } } - private void initializeOperators(Map fetchOpJobConfMap) + private Map initializeOperators(ExecMapperContext context) throws HiveException { - for (Map.Entry> entry : work.getAliasToWork().entrySet()) { - LOG.debug("initializeOperators: " + entry.getKey() + ", children = " + entry.getValue().getChildOperators()); - } - // this mapper operator is used to initialize all the operators + + Map fetchOperators = new HashMap(); + + // all operators should be initialized first before fetching for (Map.Entry entry : work.getAliasToFetchWork().entrySet()) { if (entry.getValue() == null) { continue; } JobConf jobClone = new JobConf(job); - TableScanOperator ts = (TableScanOperator)work.getAliasToWork().get(entry.getKey()); + Operator forwardOp = work.getAliasToWork().get(entry.getKey()); // push down projections - ColumnProjectionUtils.appendReadColumns( - jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns()); - // push down filters - HiveInputFormat.pushFilters(jobClone, ts); - - // create a fetch operator - FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone); - fetchOpJobConfMap.put(fetchOp, jobClone); - fetchOperators.put(entry.getKey(), fetchOp); - l4j.info("fetchoperator for " + entry.getKey() + " created"); - } - // initialize all forward operator - for (Map.Entry entry : fetchOperators.entrySet()) { - // get the forward op - String alias = entry.getKey(); - Operator forwardOp = work.getAliasToWork().get(alias); + if (forwardOp instanceof TableScanOperator) { + TableScanOperator ts = (TableScanOperator) forwardOp; + ColumnProjectionUtils.appendReadColumns( + jobClone, ts.getNeededColumnIDs(), ts.getNeededColumns()); + // push down filters + HiveInputFormat.pushFilters(jobClone, ts); + } + // create a fetch operator & put the exe context into all the operators + FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone, forwardOp, context); - // put the exe context into all the operators - forwardOp.setExecContext(execContext); // All the operators need to be initialized before process - FetchOperator fetchOp = entry.getValue(); - JobConf jobConf = fetchOpJobConfMap.get(fetchOp); - - if (jobConf == null) { - jobConf = job; - } - // initialize the forward operator ObjectInspector objectInspector = fetchOp.getOutputObjectInspector(); - forwardOp.initialize(jobConf, new ObjectInspector[] {objectInspector}); + forwardOp.initialize(jobClone, new ObjectInspector[]{objectInspector}); + + fetchOperators.put(entry.getKey(), fetchOp); l4j.info("fetchoperator for " + entry.getKey() + " initialized"); } + return fetchOperators; } private void setUpFetchOpContext(FetchOperator fetchOp, String alias, String currentInputFile) throws Exception { - BucketMapJoinContext bucketMatcherCxt = this.work.getBucketMapjoinContext(); + BucketMapJoinContext bucketMatcherCxt = work.getBucketMapjoinContext(); Class bucketMatcherCls = bucketMatcherCxt.getBucketMatcherClass(); - BucketMatcher bucketMatcher = ReflectionUtils.newInstance(bucketMatcherCls, - null); + BucketMatcher bucketMatcher = ReflectionUtils.newInstance(bucketMatcherCls, null); bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt.getAliasBucketFileNameMapping()); List aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile, bucketMatcherCxt .getMapJoinBigTableAlias(), alias); - fetchOp.setupContext(aliasFiles); + fetchOp.setupContext(currentInputFile, aliasFiles); } @Override @@ -490,7 +455,6 @@ public String getName() { @Override public StageType getType() { - //assert false; return StageType.MAPREDLOCAL; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java index 3cae727..6805799 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java @@ -20,8 +20,6 @@ import java.io.Serializable; -import org.apache.hadoop.hive.ql.exec.ListSinkOperator; - /** * ColumnStats Work. * @@ -66,14 +64,6 @@ public void setColStats(ColumnStatsDesc colStats) { this.colStats = colStats; } - public ListSinkOperator getSink() { - return fWork.getSink(); - } - - public void initializeForFetch() { - fWork.initializeForFetch(); - } - public int getLeastNumRows() { return fWork.getLeastNumRows(); } diff --git ql/src/test/queries/clientpositive/join_vc.q ql/src/test/queries/clientpositive/join_vc.q index 3f2190e..2972ecc 100644 --- ql/src/test/queries/clientpositive/join_vc.q +++ ql/src/test/queries/clientpositive/join_vc.q @@ -12,3 +12,12 @@ from src t1 join src t2 on t1.key = t2.key where t1.key < 100 order by t2.BLOCK_ select t2.BLOCK__OFFSET__INSIDE__FILE from src t1 join src t2 on t1.key = t2.key where t1.key < 100 order by t2.BLOCK__OFFSET__INSIDE__FILE; + +set hive.auto.convert.join=true; + +-- HIVE-4790 MapredLocalTask does not make virtual columns + +explain +SELECT *,a.BLOCK__OFFSET__INSIDE__FILE FROM src a JOIN src b ON a.key = b.key where a.key < 50 order by a.key, a.BLOCK__OFFSET__INSIDE__FILE; +SELECT *,a.BLOCK__OFFSET__INSIDE__FILE FROM src a JOIN src b ON a.key = b.key where a.key < 50 order by a.key, a.BLOCK__OFFSET__INSIDE__FILE; + diff --git ql/src/test/results/clientpositive/join_vc.q.out ql/src/test/results/clientpositive/join_vc.q.out index c0f11e7..6519842 100644 --- ql/src/test/results/clientpositive/join_vc.q.out +++ ql/src/test/results/clientpositive/join_vc.q.out @@ -394,3 +394,174 @@ POSTHOOK: Input: default@src 968 968 968 +PREHOOK: query: -- HIVE-4790 MapredLocalTask does not make virtual columns + +explain +SELECT *,a.BLOCK__OFFSET__INSIDE__FILE FROM src a JOIN src b ON a.key = b.key where a.key < 50 order by a.key, a.BLOCK__OFFSET__INSIDE__FILE +PREHOOK: type: QUERY +POSTHOOK: query: -- HIVE-4790 MapredLocalTask does not make virtual columns + +explain +SELECT *,a.BLOCK__OFFSET__INSIDE__FILE FROM src a JOIN src b ON a.key = b.key where a.key < 50 order by a.key, a.BLOCK__OFFSET__INSIDE__FILE +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-5 is a root stage + Stage-2 depends on stages: Stage-5 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-5 + Map Reduce Local Work + Alias -> Map Local Tables: + $hdt$_0:a + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + $hdt$_0:a + TableScan + alias: a + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(key) < 50.0) (type: boolean) + Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), BLOCK__OFFSET__INSIDE__FILE (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE + HashTable Sink Operator + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + + Stage: Stage-2 + Map Reduce + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (UDFToDouble(key) < 50.0) (type: boolean) + Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 182 Data size: 1939 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string), _col3 (type: string), _col4 (type: string), _col2 (type: bigint) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 182 Data size: 1939 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col4 (type: bigint) + sort order: ++ + Statistics: Num rows: 182 Data size: 1939 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string), _col2 (type: string), _col3 (type: string) + Local Work: + Map Reduce Local Work + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string), VALUE._col1 (type: string), VALUE._col2 (type: string), KEY.reducesinkkey1 (type: bigint) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 182 Data size: 1939 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 182 Data size: 1939 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT *,a.BLOCK__OFFSET__INSIDE__FILE FROM src a JOIN src b ON a.key = b.key where a.key < 50 order by a.key, a.BLOCK__OFFSET__INSIDE__FILE +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: SELECT *,a.BLOCK__OFFSET__INSIDE__FILE FROM src a JOIN src b ON a.key = b.key where a.key < 50 order by a.key, a.BLOCK__OFFSET__INSIDE__FILE +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +0 val_0 0 val_0 2088 +0 val_0 0 val_0 2088 +0 val_0 0 val_0 2088 +0 val_0 0 val_0 2632 +0 val_0 0 val_0 2632 +0 val_0 0 val_0 2632 +0 val_0 0 val_0 968 +0 val_0 0 val_0 968 +0 val_0 0 val_0 968 +10 val_10 10 val_10 2846 +11 val_11 11 val_11 3170 +12 val_12 12 val_12 1720 +12 val_12 12 val_12 1720 +12 val_12 12 val_12 4362 +12 val_12 12 val_12 4362 +15 val_15 15 val_15 2770 +15 val_15 15 val_15 2770 +15 val_15 15 val_15 386 +15 val_15 15 val_15 386 +17 val_17 17 val_17 910 +18 val_18 18 val_18 5340 +18 val_18 18 val_18 5340 +18 val_18 18 val_18 5514 +18 val_18 18 val_18 5514 +19 val_19 19 val_19 2824 +2 val_2 2 val_2 4004 +20 val_20 20 val_20 1118 +24 val_24 24 val_24 1972 +24 val_24 24 val_24 1972 +24 val_24 24 val_24 4594 +24 val_24 24 val_24 4594 +26 val_26 26 val_26 2226 +26 val_26 26 val_26 2226 +26 val_26 26 val_26 5284 +26 val_26 26 val_26 5284 +27 val_27 27 val_27 34 +28 val_28 28 val_28 5616 +30 val_30 30 val_30 3494 +33 val_33 33 val_33 3592 +34 val_34 34 val_34 3192 +35 val_35 35 val_35 1238 +35 val_35 35 val_35 1238 +35 val_35 35 val_35 1238 +35 val_35 35 val_35 3138 +35 val_35 35 val_35 3138 +35 val_35 35 val_35 3138 +35 val_35 35 val_35 4012 +35 val_35 35 val_35 4012 +35 val_35 35 val_35 4012 +37 val_37 37 val_37 328 +37 val_37 37 val_37 328 +37 val_37 37 val_37 5626 +37 val_37 37 val_37 5626 +4 val_4 4 val_4 1218 +41 val_41 41 val_41 3388 +42 val_42 42 val_42 2030 +42 val_42 42 val_42 2030 +42 val_42 42 val_42 3298 +42 val_42 42 val_42 3298 +43 val_43 43 val_43 2330 +44 val_44 44 val_44 4068 +47 val_47 47 val_47 1198 +5 val_5 5 val_5 3060 +5 val_5 5 val_5 3060 +5 val_5 5 val_5 3060 +5 val_5 5 val_5 3864 +5 val_5 5 val_5 3864 +5 val_5 5 val_5 3864 +5 val_5 5 val_5 4540 +5 val_5 5 val_5 4540 +5 val_5 5 val_5 4540 +8 val_8 8 val_8 1916 +9 val_9 9 val_9 5398