diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index ac3d686..15e8a13 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -87,7 +87,7 @@ protected transient int maxPartitions; protected transient ListBucketingCtx lbCtx; protected transient boolean isSkewedStoredAsSubDirectories; - private transient boolean statsCollectRawDataSize; + protected transient boolean statsCollectRawDataSize; private static final transient String[] FATAL_ERR_MSG = { @@ -220,6 +220,10 @@ public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws Hi } } } + + public Stat getStat() { + return stat; + } } // class FSPaths private static final long serialVersionUID = 1L; @@ -227,7 +231,7 @@ public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws Hi protected transient Serializer serializer; protected transient BytesWritable commonKey = new BytesWritable(); protected transient TableIdEnum tabIdEnum = null; - private transient LongWritable row_count; + protected transient LongWritable row_count; private transient boolean isNativeTable = true; /** @@ -236,17 +240,17 @@ public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws Hi * each reducer can write 10 files - this way we effectively get 1000 files. */ private transient ExprNodeEvaluator[] partitionEval; - private transient int totalFiles; + protected transient int totalFiles; private transient int numFiles; - private transient boolean multiFileSpray; - private transient final Map bucketMap = new HashMap(); + protected transient boolean multiFileSpray; + protected transient final Map bucketMap = new HashMap(); private transient ObjectInspector[] partitionObjectInspectors; - private transient HivePartitioner prtner; - private transient final HiveKey key = new HiveKey(); + protected transient HivePartitioner prtner; + protected transient final HiveKey key = new HiveKey(); private transient Configuration hconf; - private transient FSPaths fsp; - private transient boolean bDynParts; + protected transient FSPaths fsp; + protected transient boolean bDynParts; private transient SubStructObjectInspector subSetOI; private transient int timeOut; // JT timeout in msec. private transient long lastProgressReport = System.currentTimeMillis(); @@ -278,7 +282,7 @@ public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws Hi Class outputClass; String taskId; - private boolean filesCreated = false; + protected boolean filesCreated = false; private void initializeSpecPath() { // For a query of the type: @@ -431,7 +435,7 @@ private void dpSetup() { } } - private void createBucketFiles(FSPaths fsp) throws HiveException { + protected void createBucketFiles(FSPaths fsp) throws HiveException { try { int filesIdx = 0; Set seenBuckets = new HashSet(); @@ -543,7 +547,7 @@ private void createBucketFiles(FSPaths fsp) throws HiveException { * * @return true if a new progress update is reported, false otherwise. */ - private boolean updateProgress() { + protected boolean updateProgress() { if (reporter != null && (System.currentTimeMillis() - lastProgressReport) > timeOut) { reporter.progress(); @@ -554,7 +558,7 @@ private boolean updateProgress() { } } - Writable recordValue; + protected Writable recordValue; @Override public void processOp(Object row, int tag) throws HiveException { @@ -660,7 +664,7 @@ public void processOp(Object row, int tag) throws HiveException { * @return * @throws HiveException */ - private FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException { + protected FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException { FSPaths fsp2 = valToPaths.get(lbDirName); if (fsp2 == null) { fsp2 = createNewPaths(lbDirName); @@ -698,7 +702,7 @@ private FSPaths createNewPaths(String dirName) throws HiveException { * @param row row to process. * @return directory name. */ - private String generateListBucketingDirName(Object row) { + protected String generateListBucketingDirName(Object row) { if (!this.isSkewedStoredAsSubDirectories) { return null; } @@ -739,7 +743,7 @@ private String generateListBucketingDirName(Object row) { return lbDirName; } - private FSPaths getDynOutPaths(List row, String lbDirName) throws HiveException { + protected FSPaths getDynOutPaths(List row, String lbDirName) throws HiveException { FSPaths fp; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java index 85a22b7..8ab5395 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java @@ -22,12 +22,17 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.plan.CollectDesc; -import org.apache.hadoop.hive.ql.plan.MuxDesc; import org.apache.hadoop.hive.ql.plan.DemuxDesc; import org.apache.hadoop.hive.ql.plan.DummyStoreDesc; -import org.apache.hadoop.hive.ql.plan.ExtractDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExtractDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.ForwardDesc; @@ -39,6 +44,7 @@ import org.apache.hadoop.hive.ql.plan.LateralViewJoinDesc; import org.apache.hadoop.hive.ql.plan.LimitDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.ql.plan.MuxDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PTFDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; @@ -106,6 +112,38 @@ public OpTuple(Class descClass, Class> opClass) { MuxOperator.class)); } + public static ArrayList vectorOpvec; + static { + vectorOpvec = new ArrayList(); + vectorOpvec.add(new OpTuple(SelectDesc.class, VectorSelectOperator.class)); + vectorOpvec.add(new OpTuple(GroupByDesc.class, VectorGroupByOperator.class)); + vectorOpvec.add(new OpTuple(ReduceSinkDesc.class, + VectorReduceSinkOperator.class)); + vectorOpvec.add(new OpTuple(FileSinkDesc.class, VectorFileSinkOperator.class)); + vectorOpvec.add(new OpTuple(FilterDesc.class, VectorFilterOperator.class)); + } + + public static Operator getVectorOperator(T conf, + VectorizationContext vContext) { + Class descClass = (Class) conf.getClass(); + for (OpTuple o : vectorOpvec) { + if (o.descClass == descClass) { + try { + Operator op = (Operator) o.opClass.getDeclaredConstructor( + VectorizationContext.class, OperatorDesc.class).newInstance( + vContext, conf); + op.initializeCounters(); + return op; + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } + throw new RuntimeException("No vector operator for descriptor class " + + descClass.getName()); + } + public static Operator get(Class opClass) { for (OpTuple o : opvec) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 54b546d..7923fa0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -25,7 +25,6 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Enumeration; @@ -57,10 +56,6 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.vector.VectorExecMapper; -import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; -import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl; @@ -243,22 +238,7 @@ public int execute(DriverContext driverContext) { //See the javadoc on HiveOutputFormatImpl and HadoopShims.prepareJobOutput() job.setOutputFormat(HiveOutputFormatImpl.class); - - boolean vectorPath = HiveConf.getBoolVar(job, - HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); - - if (vectorPath) { - if (validateVectorPath()) { - LOG.info("Going down the vectorization path"); - job.setMapperClass(VectorExecMapper.class); - } else { - //fall back to non-vector mode - HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); - job.setMapperClass(ExecMapper.class); - } - } else { - job.setMapperClass(ExecMapper.class); - } + job.setMapperClass(ExecMapper.class); job.setMapOutputKeyClass(HiveKey.class); job.setMapOutputValueClass(BytesWritable.class); @@ -510,59 +490,6 @@ public int execute(DriverContext driverContext) { return (returnVal); } - private boolean validateVectorPath() { - LOG.debug("Validating if vectorized execution is applicable"); - MapWork thePlan = this.getWork().getMapWork(); - - for (String path : thePlan.getPathToPartitionInfo().keySet()) { - PartitionDesc pd = thePlan.getPathToPartitionInfo().get(path); - List> interfaceList = - Arrays.asList(pd.getInputFileFormatClass().getInterfaces()); - if (!interfaceList.contains(VectorizedInputFormatInterface.class)) { - LOG.debug("Input format: " + pd.getInputFileFormatClassName() - + ", doesn't provide vectorized input"); - return false; - } - } - VectorizationContext vc = new VectorizationContext(null, 0); - for (String onefile : thePlan.getPathToAliases().keySet()) { - List aliases = thePlan.getPathToAliases().get(onefile); - for (String onealias : aliases) { - Operator op = thePlan.getAliasToWork().get( - onealias); - Operator vectorOp = null; - try { - vectorOp = VectorMapOperator.vectorizeOperator(op, vc); - } catch (Exception e) { - LOG.debug("Cannot vectorize the plan", e); - return false; - } - if (vectorOp == null) { - LOG.debug("Cannot vectorize the plan"); - return false; - } - //verify the expressions contained in the operators - try { - validateVectorOperator(vectorOp); - } catch (HiveException e) { - LOG.debug("Cannot vectorize the plan", e); - return false; - } - } - } - return true; - } - - private void validateVectorOperator(Operator vectorOp) - throws HiveException { - vectorOp.initialize(job, null); - if (vectorOp.getChildOperators() != null) { - for (Operator vop : vectorOp.getChildOperators()) { - validateVectorOperator(vop); - } - } - } - private void handleSampling(DriverContext context, MapWork mWork, JobConf job, HiveConf conf) throws Exception { assert mWork.getAliasToWork().keySet().size() == 1; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExecMapper.java deleted file mode 100644 index 1a5a5ec..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExecMapper.java +++ /dev/null @@ -1,270 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.vector; - -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; -import java.net.URLClassLoader; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.ql.exec.FetchOperator; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; -import org.apache.hadoop.hive.ql.plan.MapredLocalWork; -import org.apache.hadoop.hive.ql.plan.MapredWork; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.util.StringUtils; -/** - * ExecMapper. - * - */ -public class VectorExecMapper extends MapReduceBase implements Mapper { - - private VectorMapOperator mo; - private Map fetchOperators; - private OutputCollector oc; - private JobConf jc; - private boolean abort = false; - private Reporter rp; - public static final Log l4j = LogFactory.getLog("VectorExecMapper"); - private static boolean done; - - // used to log memory usage periodically - public static MemoryMXBean memoryMXBean; - private long numRows = 0; - private long nextCntr = 1; - private MapredLocalWork localWork = null; - private boolean isLogInfoEnabled = false; - - private final ExecMapperContext execContext = new ExecMapperContext(); - - @Override - public void configure(JobConf job) { - // Allocate the bean at the beginning - - memoryMXBean = ManagementFactory.getMemoryMXBean(); - l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); - - isLogInfoEnabled = l4j.isInfoEnabled(); - - try { - l4j.info("conf classpath = " - + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs())); - l4j.info("thread classpath = " - + Arrays.asList(((URLClassLoader) Thread.currentThread() - .getContextClassLoader()).getURLs())); - } catch (Exception e) { - l4j.info("cannot get classpath: " + e.getMessage()); - } - try { - jc = job; - execContext.setJc(jc); - // create map and fetch operators - MapredWork mrwork = Utilities.getMapRedWork(job); - mo = new VectorMapOperator(); - mo.setConf(mrwork); - // initialize map operator - mo.setChildren(job); - l4j.info(mo.dump(0)); - // initialize map local work - localWork = mrwork.getMapWork().getMapLocalWork(); - execContext.setLocalWork(localWork); - - mo.setExecContext(execContext); - mo.initializeLocalWork(jc); - mo.initialize(jc, null); - - if (localWork == null) { - return; - } - - //The following code is for mapjoin - //initialize all the dummy ops - l4j.info("Initializing dummy operator"); - List> dummyOps = localWork.getDummyParentOp(); - for (Operator dummyOp : dummyOps){ - dummyOp.setExecContext(execContext); - dummyOp.initialize(jc,null); - } - - - } catch (Throwable e) { - abort = true; - if (e instanceof OutOfMemoryError) { - // will this be true here? - // Don't create a new object if we are already out of memory - throw (OutOfMemoryError) e; - } else { - throw new RuntimeException("Map operator initialization failed", e); - } - } - } - - public void map(Object key, Object value, OutputCollector output, - Reporter reporter) throws IOException { - if (oc == null) { - oc = output; - rp = reporter; - mo.setOutputCollector(oc); - mo.setReporter(rp); - } - // reset the execContext for each new row - execContext.resetRow(); - - try { - if (mo.getDone()) { - done = true; - } else { - // Since there is no concept of a group, we don't invoke - // startGroup/endGroup for a mapper - mo.process(value); - if (isLogInfoEnabled) { - numRows++; - if (numRows == nextCntr) { - long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - l4j.info("ExecMapper: processing " + numRows - + " rows: used memory = " + used_memory); - nextCntr = getNextCntr(numRows); - } - } - } - } catch (Throwable e) { - abort = true; - if (e instanceof OutOfMemoryError) { - // Don't create a new object if we are already out of memory - throw (OutOfMemoryError) e; - } else { - l4j.fatal(StringUtils.stringifyException(e)); - throw new RuntimeException(e); - } - } - } - - - private long getNextCntr(long cntr) { - // A very simple counter to keep track of number of rows processed by the - // reducer. It dumps - // every 1 million times, and quickly before that - if (cntr >= 1000000) { - return cntr + 1000000; - } - - return 10 * cntr; - } - - @Override - public void close() { - // No row was processed - if (oc == null) { - l4j.trace("Close called. no row processed by map."); - } - - // check if there are IOExceptions - if (!abort) { - abort = execContext.getIoCxt().getIOExceptions(); - } - - // detecting failed executions by exceptions thrown by the operator tree - // ideally hadoop should let us know whether map execution failed or not - try { - mo.close(abort); - - //for close the local work - if(localWork != null){ - List> dummyOps = localWork.getDummyParentOp(); - - for (Operator dummyOp : dummyOps){ - dummyOp.close(abort); - } - } - - if (fetchOperators != null) { - MapredLocalWork localWork = mo.getConf().getMapWork().getMapLocalWork(); - for (Map.Entry entry : fetchOperators.entrySet()) { - Operator forwardOp = localWork - .getAliasToWork().get(entry.getKey()); - forwardOp.close(abort); - } - } - - if (isLogInfoEnabled) { - long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); - l4j.info("ExecMapper: processed " + numRows + " rows: used memory = " - + used_memory); - } - - reportStats rps = new reportStats(rp); - mo.preorderMap(rps); - return; - } catch (Exception e) { - if (!abort) { - // signal new failure to map-reduce - l4j.error("Hit error while closing operators - failing tree"); - throw new RuntimeException("Hive Runtime Error while closing operators", e); - } - } - } - - public static boolean getDone() { - return done; - } - - public boolean isAbort() { - return abort; - } - - public void setAbort(boolean abort) { - this.abort = abort; - } - - public static void setDone(boolean done) { - VectorExecMapper.done = done; - } - - /** - * reportStats. - * - */ - public static class reportStats implements Operator.OperatorFunc { - Reporter rp; - - public reportStats(Reporter rp) { - this.rp = rp; - } - - public void func(Operator op) { - Map opStats = op.getStats(); - for (Map.Entry e : opStats.entrySet()) { - if (rp != null) { - rp.incrCounter(e.getKey(), e.getValue()); - } - } - } - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java index 780ff05..a012194 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java @@ -19,554 +19,41 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Stack; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; -import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; -import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack; -import org.apache.hadoop.hive.ql.exec.Stat; -import org.apache.hadoop.hive.ql.exec.TerminalOperator; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; -import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; -import org.apache.hadoop.hive.ql.io.HiveKey; -import org.apache.hadoop.hive.ql.io.HiveOutputFormat; -import org.apache.hadoop.hive.ql.io.HivePartitioner; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; -import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; -import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.PlanUtils; -import org.apache.hadoop.hive.ql.plan.SkewedColumnPositionPair; -import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.ql.stats.StatsSetupConst; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; -import org.apache.hadoop.hive.serde2.Serializer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.util.ReflectionUtils; /** * File Sink operator implementation. **/ -public class VectorFileSinkOperator extends TerminalOperator implements - Serializable { +public class VectorFileSinkOperator extends FileSinkOperator { - protected transient HashMap valToPaths; - protected transient int numDynParts; - protected transient List dpColNames; - protected transient DynamicPartitionCtx dpCtx; - protected transient boolean isCompressed; - protected transient Path parent; - protected transient HiveOutputFormat hiveOutputFormat; - protected transient Path specPath; - protected transient String childSpecPathDynLinkedPartitions; - protected transient int dpStartCol; // start column # for DP columns - protected transient List dpVals; // array of values corresponding to DP columns - protected transient List dpWritables; - protected transient RecordWriter[] rowOutWriters; // row specific RecordWriters - protected transient int maxPartitions; - protected transient ListBucketingCtx lbCtx; - protected transient boolean isSkewedStoredAsSubDirectories; - private transient boolean statsCollectRawDataSize; - - private static final transient String[] FATAL_ERR_MSG = { - null, // counter value 0 means no error - "Number of dynamic partitions exceeded hive.exec.max.dynamic.partitions" + - ".pernode." - }; - private final VectorizationContext vContext; + private static final long serialVersionUID = 1L; public VectorFileSinkOperator(VectorizationContext context, OperatorDesc conf) { - this.vContext = context; + super(); this.conf = (FileSinkDesc) conf; } - public class FSPaths implements Cloneable { - Path tmpPath; - Path taskOutputTempPath; - Path[] outPaths; - Path[] finalPaths; - RecordWriter[] outWriters; - Stat stat; - - public FSPaths() { - } - - public FSPaths(Path specPath) { - tmpPath = Utilities.toTempPath(specPath); - taskOutputTempPath = Utilities.toTaskTempPath(specPath); - outPaths = new Path[numFiles]; - finalPaths = new Path[numFiles]; - outWriters = new RecordWriter[numFiles]; - stat = new Stat(); - } - - /** - * Update OutPath according to tmpPath. - */ - public Path getTaskOutPath(String taskId) { - return getOutPath(taskId, this.taskOutputTempPath); - } - - - /** - * Update OutPath according to tmpPath. - */ - public Path getOutPath(String taskId) { - return getOutPath(taskId, this.tmpPath); - } - - /** - * Update OutPath according to tmpPath. - */ - public Path getOutPath(String taskId, Path tmp) { - return new Path(tmp, Utilities.toTempPath(taskId)); - } - - /** - * Update the final paths according to tmpPath. - */ - public Path getFinalPath(String taskId) { - return getFinalPath(taskId, this.tmpPath, null); - } - - /** - * Update the final paths according to tmpPath. - */ - public Path getFinalPath(String taskId, Path tmpPath, String extension) { - if (extension != null) { - return new Path(tmpPath, taskId + extension); - } else { - return new Path(tmpPath, taskId); - } - } - - public void setOutWriters(RecordWriter[] out) { - outWriters = out; - } - - public RecordWriter[] getOutWriters() { - return outWriters; - } - - public void closeWriters(boolean abort) throws HiveException { - for (int idx = 0; idx < outWriters.length; idx++) { - if (outWriters[idx] != null) { - try { - outWriters[idx].close(abort); - updateProgress(); - } catch (IOException e) { - throw new HiveException(e); - } - } - } - } - - private void commit(FileSystem fs) throws HiveException { - for (int idx = 0; idx < outPaths.length; ++idx) { - try { - if ((bDynParts || isSkewedStoredAsSubDirectories) - && !fs.exists(finalPaths[idx].getParent())) { - fs.mkdirs(finalPaths[idx].getParent()); - } - if (!fs.rename(outPaths[idx], finalPaths[idx])) { - throw new HiveException("Unable to rename output from: " + - outPaths[idx] + " to: " + finalPaths[idx]); - } - updateProgress(); - } catch (IOException e) { - throw new HiveException("Unable to rename output from: " + - outPaths[idx] + " to: " + finalPaths[idx], e); - } - } - } - - public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws HiveException { - for (int idx = 0; idx < outWriters.length; idx++) { - if (outWriters[idx] != null) { - try { - outWriters[idx].close(abort); - if (delete) { - fs.delete(outPaths[idx], true); - } - updateProgress(); - } catch (IOException e) { - throw new HiveException(e); - } - } - } - } - } // class FSPaths - - private static final long serialVersionUID = 1L; - protected transient FileSystem fs; - protected transient Serializer serializer; - protected transient BytesWritable commonKey = new BytesWritable(); - protected transient TableIdEnum tabIdEnum = null; - private transient LongWritable row_count; - private transient boolean isNativeTable = true; - - /** - * The evaluators for the multiFile sprayer. If the table under consideration has 1000 buckets, - * it is not a good idea to start so many reducers - if the maximum number of reducers is 100, - * each reducer can write 10 files - this way we effectively get 1000 files. - */ - private transient ExprNodeEvaluator[] partitionEval; - private transient int totalFiles; - private transient int numFiles; - private transient boolean multiFileSpray; - private transient final Map bucketMap = new HashMap(); - - private transient ObjectInspector[] partitionObjectInspectors; - private transient HivePartitioner prtner; - private transient final HiveKey key = new HiveKey(); - private transient Configuration hconf; - private transient FSPaths fsp; - private transient boolean bDynParts; - private transient SubStructObjectInspector subSetOI; - private transient int timeOut; // JT timeout in msec. - private transient long lastProgressReport = System.currentTimeMillis(); - - /** - * TableIdEnum. - * - */ - public static enum TableIdEnum { - TABLE_ID_1_ROWCOUNT, - TABLE_ID_2_ROWCOUNT, - TABLE_ID_3_ROWCOUNT, - TABLE_ID_4_ROWCOUNT, - TABLE_ID_5_ROWCOUNT, - TABLE_ID_6_ROWCOUNT, - TABLE_ID_7_ROWCOUNT, - TABLE_ID_8_ROWCOUNT, - TABLE_ID_9_ROWCOUNT, - TABLE_ID_10_ROWCOUNT, - TABLE_ID_11_ROWCOUNT, - TABLE_ID_12_ROWCOUNT, - TABLE_ID_13_ROWCOUNT, - TABLE_ID_14_ROWCOUNT, - TABLE_ID_15_ROWCOUNT; - } - - protected transient boolean autoDelete = false; - protected transient JobConf jc; - Class outputClass; - String taskId; - - private boolean filesCreated = false; - - private void initializeSpecPath() { - // For a query of the type: - // insert overwrite table T1 - // select * from (subq1 union all subq2)u; - // subQ1 and subQ2 write to directories Parent/Child_1 and - // Parent/Child_2 respectively, and union is removed. - // The movetask that follows subQ1 and subQ2 tasks moves the directory - // 'Parent' + public VectorFileSinkOperator() { - // However, if the above query contains dynamic partitions, subQ1 and - // subQ2 have to write to directories: Parent/DynamicPartition/Child_1 - // and Parent/DynamicPartition/Child_1 respectively. - // The movetask that follows subQ1 and subQ2 tasks still moves the directory - // 'Parent' - if ((!conf.isLinkedFileSink()) || (dpCtx == null)) { - specPath = new Path(conf.getDirName()); - childSpecPathDynLinkedPartitions = null; - return; - } - - specPath = new Path(conf.getParentDir()); - childSpecPathDynLinkedPartitions = Utilities.getFileNameFromDirName(conf.getDirName()); } @Override - protected void initializeOp(Configuration hconf) throws HiveException { - try { - this.hconf = hconf; - filesCreated = false; - isNativeTable = !conf.getTableInfo().isNonNative(); - multiFileSpray = conf.isMultiFileSpray(); - totalFiles = conf.getTotalFiles(); - numFiles = conf.getNumFiles(); - dpCtx = conf.getDynPartCtx(); - lbCtx = conf.getLbCtx(); - valToPaths = new HashMap(); - taskId = Utilities.getTaskId(hconf); - initializeSpecPath(); - fs = specPath.getFileSystem(hconf); - hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance(); - isCompressed = conf.getCompressed(); - parent = Utilities.toTempPath(conf.getDirName()); - statsCollectRawDataSize = conf.isStatsCollectRawDataSize(); - - serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance(); - serializer.initialize(null, conf.getTableInfo().getProperties()); - outputClass = serializer.getSerializedClass(); - - // Timeout is chosen to make sure that even if one iteration takes more than - // half of the script.timeout but less than script.timeout, we will still - // be able to report progress. - timeOut = hconf.getInt("mapred.healthChecker.script.timeout", 600000) / 2; - - if (hconf instanceof JobConf) { - jc = (JobConf) hconf; - } else { - // test code path - jc = new JobConf(hconf, ExecDriver.class); - } - - if (multiFileSpray) { - partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()]; - int i = 0; - for (ExprNodeDesc e : conf.getPartitionCols()) { - partitionEval[i++] = ExprNodeEvaluatorFactory.get(e); - } - - partitionObjectInspectors = initEvaluators(partitionEval, outputObjInspector); - prtner = (HivePartitioner) ReflectionUtils.newInstance( - jc.getPartitionerClass(), null); - } - int id = conf.getDestTableId(); - if ((id != 0) && (id <= TableIdEnum.values().length)) { - String enumName = "TABLE_ID_" + String.valueOf(id) + "_ROWCOUNT"; - tabIdEnum = TableIdEnum.valueOf(enumName); - row_count = new LongWritable(); - statsMap.put(tabIdEnum, row_count); - } - - if (dpCtx != null) { - dpSetup(); - } - - if (lbCtx != null) { - lbSetup(); - } - - if (!bDynParts) { - fsp = new FSPaths(specPath); - - // Create all the files - this is required because empty files need to be created for - // empty buckets - // createBucketFiles(fsp); - if (!this.isSkewedStoredAsSubDirectories) { - valToPaths.put("", fsp); // special entry for non-DP case - } - } - - initializeChildren(hconf); - } catch (HiveException e) { - throw e; - } catch (Exception e) { - e.printStackTrace(); - throw new HiveException(e); - } - } - - /** - * Initialize list bucketing information - */ - private void lbSetup() { - this.isSkewedStoredAsSubDirectories = ((lbCtx == null) ? false : lbCtx.isSkewedStoredAsDir()); - } - - /** - * Set up for dynamic partitioning including a new ObjectInspector for the output row. - */ - private void dpSetup() { - - this.bDynParts = false; - this.numDynParts = dpCtx.getNumDPCols(); - this.dpColNames = dpCtx.getDPColNames(); - this.maxPartitions = dpCtx.getMaxPartitionsPerNode(); - - assert numDynParts == dpColNames.size() : "number of dynamic paritions should be the same as the size of DP mapping"; - - if (dpColNames != null && dpColNames.size() > 0) { - this.bDynParts = true; - assert inputObjInspectors.length == 1 : "FileSinkOperator should have 1 parent, but it has " - + inputObjInspectors.length; - StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[0]; - // remove the last dpMapping.size() columns from the OI - List fieldOI = soi.getAllStructFieldRefs(); - ArrayList newFieldsOI = new ArrayList(); - ArrayList newFieldsName = new ArrayList(); - this.dpStartCol = 0; - for (StructField sf : fieldOI) { - String fn = sf.getFieldName(); - if (!dpCtx.getInputToDPCols().containsKey(fn)) { - newFieldsOI.add(sf.getFieldObjectInspector()); - newFieldsName.add(sf.getFieldName()); - this.dpStartCol++; - } - } - assert newFieldsOI.size() > 0 : "new Fields ObjectInspector is empty"; - - this.subSetOI = new SubStructObjectInspector(soi, 0, this.dpStartCol); - this.dpVals = new ArrayList(numDynParts); - this.dpWritables = new ArrayList(numDynParts); - } - } - - private void createBucketFiles(FSPaths fsp) throws HiveException { - try { - int filesIdx = 0; - Set seenBuckets = new HashSet(); - for (int idx = 0; idx < totalFiles; idx++) { - if (this.getExecContext() != null && this.getExecContext().getFileId() != null) { - LOG.info("replace taskId from execContext "); - - taskId = Utilities.replaceTaskIdFromFilename(taskId, this.getExecContext().getFileId()); - - LOG.info("new taskId: FS " + taskId); - - assert !multiFileSpray; - assert totalFiles == 1; - } - - if (multiFileSpray) { - key.setHashCode(idx); - - // Does this hashcode belong to this reducer - int numReducers = totalFiles / numFiles; - - if (numReducers > 1) { - int currReducer = Integer.valueOf(Utilities.getTaskIdFromFilename(Utilities - .getTaskId(hconf))); - - int reducerIdx = prtner.getPartition(key, null, numReducers); - if (currReducer != reducerIdx) { - continue; - } - } - - int bucketNum = prtner.getBucket(key, null, totalFiles); - if (seenBuckets.contains(bucketNum)) { - continue; - } - seenBuckets.add(bucketNum); - - bucketMap.put(bucketNum, filesIdx); - taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum); - } - if (isNativeTable) { - fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId); - LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]); - fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId); - LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]); - } else { - fsp.finalPaths[filesIdx] = fsp.outPaths[filesIdx] = specPath; - } - try { - // The reason to keep these instead of using - // OutputFormat.getRecordWriter() is that - // getRecordWriter does not give us enough control over the file name that - // we create. - String extension = Utilities.getFileExtension(jc, isCompressed, - hiveOutputFormat); - if (!bDynParts && !this.isSkewedStoredAsSubDirectories) { - fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, parent, extension); - } else { - fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, extension); - } - - } catch (Exception e) { - e.printStackTrace(); - throw new HiveException(e); - } - LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]); - - if (isNativeTable) { - try { - // in recent hadoop versions, use deleteOnExit to clean tmp files. - autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit( - fs, fsp.outPaths[filesIdx]); - } catch (IOException e) { - throw new HiveException(e); - } - } - - Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc); - // only create bucket files only if no dynamic partitions, - // buckets of dynamic partitions will be created for each newly created partition - fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter( - jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx], - reporter); - // increment the CREATED_FILES counter - if (reporter != null) { - reporter.incrCounter(ProgressCounter.CREATED_FILES, 1); - } - filesIdx++; - } - assert filesIdx == numFiles; - - // in recent hadoop versions, use deleteOnExit to clean tmp files. - if (isNativeTable) { - autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs, fsp.outPaths[0]); - } - } catch (HiveException e) { - throw e; - } catch (Exception e) { - e.printStackTrace(); - throw new HiveException(e); - } - - filesCreated = true; - } - - /** - * Report status to JT so that JT won't kill this task if closing takes too long - * due to too many files to close and the NN is overloaded. - * - * @return true if a new progress update is reported, false otherwise. - */ - private boolean updateProgress() { - if (reporter != null && - (System.currentTimeMillis() - lastProgressReport) > timeOut) { - reporter.progress(); - lastProgressReport = System.currentTimeMillis(); - return true; - } else { - return false; - } - } - - Writable recordValue; - - @Override public void processOp(Object data, int tag) throws HiveException { VectorizedRowBatch vrg = (VectorizedRowBatch)data; @@ -652,15 +139,15 @@ public void processOp(Object data, int tag) throws HiveException { } } - rowOutWriters = fpaths.outWriters; + rowOutWriters = fpaths.getOutWriters(); if (conf.isGatherStats()) { if (statsCollectRawDataSize) { SerDeStats stats = serializer.getSerDeStats(); if (stats != null) { - fpaths.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); + fpaths.getStat().addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); } } - fpaths.stat.addToStat(StatsSetupConst.ROW_COUNT, 1); + fpaths.getStat().addToStat(StatsSetupConst.ROW_COUNT, 1); } @@ -682,417 +169,4 @@ public void processOp(Object data, int tag) throws HiveException { } } } - - /** - * Generate list bucketing directory name from a row. - * @param row row to process. - * @return directory name. - */ - private String generateListBucketingDirName(Object row) { - if (!this.isSkewedStoredAsSubDirectories) { - return null; - } - - String lbDirName = null; - List standObjs = new ArrayList(); - List skewedCols = lbCtx.getSkewedColNames(); - List> allSkewedVals = lbCtx.getSkewedColValues(); - List skewedValsCandidate = null; - Map, String> locationMap = lbCtx.getLbLocationMap(); - - /* Convert input row to standard objects. */ - ObjectInspectorUtils.copyToStandardObject(standObjs, row, - (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE); - - assert (standObjs.size() >= skewedCols.size()) : - "The row has less number of columns than no. of skewed column."; - - skewedValsCandidate = new ArrayList(skewedCols.size()); - for (SkewedColumnPositionPair posPair : lbCtx.getRowSkewedIndex()) { - skewedValsCandidate.add(posPair.getSkewColPosition(), - standObjs.get(posPair.getTblColPosition()).toString()); - } - /* The row matches skewed column names. */ - if (allSkewedVals.contains(skewedValsCandidate)) { - /* matches skewed values. */ - lbDirName = FileUtils.makeListBucketingDirName(skewedCols, skewedValsCandidate); - locationMap.put(skewedValsCandidate, lbDirName); - } else { - /* create default directory. */ - lbDirName = FileUtils.makeDefaultListBucketingDirName(skewedCols, - lbCtx.getDefaultDirName()); - List defaultKey = Arrays.asList(lbCtx.getDefaultKey()); - if (!locationMap.containsKey(defaultKey)) { - locationMap.put(defaultKey, lbDirName); - } - } - return lbDirName; - } - - /** - * Lookup list bucketing path. - * @param lbDirName - * @return - * @throws HiveException - */ - private FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException { - FSPaths fsp2 = valToPaths.get(lbDirName); - if (fsp2 == null) { - fsp2 = createNewPaths(lbDirName); - } - return fsp2; - } - - /** - * create new path. - * - * @param dirName - * @return - * @throws HiveException - */ - private FSPaths createNewPaths(String dirName) throws HiveException { - FSPaths fsp2 = new FSPaths(specPath); - if (childSpecPathDynLinkedPartitions != null) { - fsp2.tmpPath = new Path(fsp2.tmpPath, - dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions); - fsp2.taskOutputTempPath = - new Path(fsp2.taskOutputTempPath, - dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions); - } else { - fsp2.tmpPath = new Path(fsp2.tmpPath, dirName); - fsp2.taskOutputTempPath = - new Path(fsp2.taskOutputTempPath, dirName); - } - createBucketFiles(fsp2); - valToPaths.put(dirName, fsp2); - return fsp2; - } - - private FSPaths getDynOutPaths(List row, String lbDirName) throws HiveException { - - FSPaths fp; - - // get the path corresponding to the dynamic partition columns, - String dpDir = getDynPartDirectory(row, dpColNames, numDynParts); - - if (dpDir != null) { - dpDir = appendListBucketingDirName(lbDirName, dpDir); - FSPaths fsp2 = valToPaths.get(dpDir); - - if (fsp2 == null) { - // check # of dp - if (valToPaths.size() > maxPartitions) { - // throw fatal error - incrCounter(fatalErrorCntr, 1); - fatalError = true; - LOG.error("Fatal error was thrown due to exceeding number of dynamic partitions"); - } - fsp2 = createNewPaths(dpDir); - } - fp = fsp2; - } else { - fp = fsp; - } - return fp; - } - - /** - * Append list bucketing dir name to original dir name. - * Skewed columns cannot be partitioned columns. - * @param lbDirName - * @param dpDir - * @return - */ - private String appendListBucketingDirName(String lbDirName, String dpDir) { - StringBuilder builder = new StringBuilder(dpDir); - dpDir = (lbDirName == null) ? dpDir : builder.append(Path.SEPARATOR).append(lbDirName) - .toString(); - return dpDir; - } - - // given the current input row, the mapping for input col info to dp columns, and # of dp cols, - // return the relative path corresponding to the row. - // e.g., ds=2008-04-08/hr=11 - private String getDynPartDirectory(List row, List dpColNames, int numDynParts) { - assert row.size() == numDynParts && numDynParts == dpColNames.size() : "data length is different from num of DP columns"; - return FileUtils.makePartName(dpColNames, row); - } - - @Override - protected void fatalErrorMessage(StringBuilder errMsg, long counterCode) { - errMsg.append("Operator ").append(getOperatorId()).append(" (id=").append(id).append("): "); - errMsg.append(counterCode > FATAL_ERR_MSG.length - 1 ? - "fatal error." : - FATAL_ERR_MSG[(int) counterCode]); - // number of partitions exceeds limit, list all the partition names - if (counterCode > 0) { - errMsg.append(lsDir()); - } - } - - // sample the partitions that are generated so that users have a sense of what's causing the error - private String lsDir() { - String specPath = conf.getDirName(); - // need to get a JobConf here because it's not passed through at client side - JobConf jobConf = new JobConf(ExecDriver.class); - Path tmpPath = Utilities.toTempPath(specPath); - StringBuilder sb = new StringBuilder("\n"); - try { - DynamicPartitionCtx dpCtx = conf.getDynPartCtx(); - int numDP = dpCtx.getNumDPCols(); - FileSystem fs = tmpPath.getFileSystem(jobConf); - int level = numDP; - if (conf.isLinkedFileSink()) { - level++; - } - FileStatus[] status = Utilities.getFileStatusRecurse(tmpPath, level, fs); - sb.append("Sample of ") - .append(Math.min(status.length, 100)) - .append(" partitions created under ") - .append(tmpPath.toString()) - .append(":\n"); - for (int i = 0; i < status.length; ++i) { - sb.append("\t.../"); - sb.append(getPartitionSpec(status[i].getPath(), numDP)) - .append("\n"); - } - sb.append("...\n"); - } catch (Exception e) { - // cannot get the subdirectories, just return the root directory - sb.append(tmpPath).append("...\n").append(e.getMessage()); - e.printStackTrace(); - } finally { - return sb.toString(); - } - } - - private String getPartitionSpec(Path path, int level) { - Stack st = new Stack(); - Path p = path; - for (int i = 0; i < level; ++i) { - st.push(p.getName()); - p = p.getParent(); - } - StringBuilder sb = new StringBuilder(); - while (!st.empty()) { - sb.append(st.pop()); - } - return sb.toString(); - } - - @Override - public void closeOp(boolean abort) throws HiveException { - if (!bDynParts && !filesCreated) { - createBucketFiles(fsp); - } - - lastProgressReport = System.currentTimeMillis(); - if (!abort) { - for (FSPaths fsp : valToPaths.values()) { - fsp.closeWriters(abort); - if (isNativeTable) { - fsp.commit(fs); - } - } - // Only publish stats if this operator's flag was set to gather stats - if (conf.isGatherStats()) { - publishStats(); - } - } else { - // Will come here if an Exception was thrown in map() or reduce(). - // Hadoop always call close() even if an Exception was thrown in map() or - // reduce(). - for (FSPaths fsp : valToPaths.values()) { - fsp.abortWriters(fs, abort, !autoDelete && isNativeTable); - } - } - } - - /** - * @return the name of the operator - */ - @Override - public String getName() { - return getOperatorName(); - } - - static public String getOperatorName() { - return "FS"; - } - - @Override - public void jobCloseOp(Configuration hconf, boolean success, JobCloseFeedBack feedBack) - throws HiveException { - try { - if ((conf != null) && isNativeTable) { - String specPath = conf.getDirName(); - DynamicPartitionCtx dpCtx = conf.getDynPartCtx(); - if (conf.isLinkedFileSink() && (dpCtx != null)) { - specPath = conf.getParentDir(); - } - Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf, - reporter); - } - } catch (IOException e) { - throw new HiveException(e); - } - super.jobCloseOp(hconf, success, feedBack); - } - - @Override - public OperatorType getType() { - return OperatorType.FILESINK; - } - - @Override - public void augmentPlan() { - PlanUtils.configureOutputJobPropertiesForStorageHandler( - getConf().getTableInfo()); - } - - public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { - if (hiveOutputFormat == null) { - try { - hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance(); - } catch (Exception ex) { - throw new IOException(ex); - } - } - Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), job); - - if (conf.getTableInfo().isNonNative()) { - //check the ouput specs only if it is a storage handler (native tables's outputformats does - //not set the job's output properties correctly) - try { - hiveOutputFormat.checkOutputSpecs(ignored, job); - } catch (NoSuchMethodError e) { - //For BC, ignore this for now, but leave a log message - LOG.warn("HiveOutputFormat should implement checkOutputSpecs() method`"); - } - } - } - - private void publishStats() throws HiveException { - boolean isStatsReliable = conf.isStatsReliable(); - - // Initializing a stats publisher - StatsPublisher statsPublisher = Utilities.getStatsPublisher(jc); - - if (statsPublisher == null) { - // just return, stats gathering should not block the main query - LOG.error("StatsPublishing error: StatsPublisher is not initialized."); - if (isStatsReliable) { - throw new HiveException(ErrorMsg.STATSPUBLISHER_NOT_OBTAINED.getErrorCodedMsg()); - } - return; - } - - if (!statsPublisher.connect(hconf)) { - // just return, stats gathering should not block the main query - LOG.error("StatsPublishing error: cannot connect to database"); - if (isStatsReliable) { - throw new HiveException(ErrorMsg.STATSPUBLISHER_CONNECTION_ERROR.getErrorCodedMsg()); - } - return; - } - - String taskID = Utilities.getTaskIdFromFilename(Utilities.getTaskId(hconf)); - String spSpec = conf.getStaticSpec() != null ? conf.getStaticSpec() : ""; - - for (String fspKey : valToPaths.keySet()) { - FSPaths fspValue = valToPaths.get(fspKey); - String key; - - // construct the key(fileID) to insert into the intermediate stats table - if (fspKey == "") { - // for non-partitioned/static partitioned table, the key for temp storage is - // common key prefix + static partition spec + taskID - String keyPrefix = Utilities.getHashedStatsPrefix( - conf.getStatsAggPrefix() + spSpec, conf.getMaxStatsKeyPrefixLength()); - key = keyPrefix + taskID; - } else { - // for partitioned table, the key is - // common key prefix + static partition spec + DynamicPartSpec + taskID - key = createKeyForStatsPublisher(taskID, spSpec, fspKey); - } - Map statsToPublish = new HashMap(); - for (String statType : fspValue.stat.getStoredStats()) { - statsToPublish.put(statType, Long.toString(fspValue.stat.getStat(statType))); - } - if (!statsPublisher.publishStat(key, statsToPublish)) { - // The original exception is lost. - // Not changing the interface to maintain backward compatibility - if (isStatsReliable) { - throw new HiveException(ErrorMsg.STATSPUBLISHER_PUBLISHING_ERROR.getErrorCodedMsg()); - } - } - } - if (!statsPublisher.closeConnection()) { - // The original exception is lost. - // Not changing the interface to maintain backward compatibility - if (isStatsReliable) { - throw new HiveException(ErrorMsg.STATSPUBLISHER_CLOSING_ERROR.getErrorCodedMsg()); - } - } - } - - /** - * This is server side code to create key in order to save statistics to stats database. - * Client side will read it via StatsTask.java aggregateStats(). - * Client side reads it via db query prefix which is based on partition spec. - * Since store-as-subdir information is not part of partition spec, we have to - * remove store-as-subdir information from variable "keyPrefix" calculation. - * But we have to keep store-as-subdir information in variable "key" calculation - * since each skewed value has a row in stats db and "key" is db key, - * otherwise later value overwrites previous value. - * Performance impact due to string handling is minimum since this method is - * only called once in FileSinkOperator closeOp(). - * For example, - * create table test skewed by (key, value) on (('484','val_484') stored as DIRECTORIES; - * skewedValueDirList contains 2 elements: - * 1. key=484/value=val_484 - * 2. HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME - * Case #1: Static partition with store-as-sub-dir - * spSpec has SP path - * fspKey has either - * key=484/value=val_484 or - * HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME - * After filter, fspKey is empty, storedAsDirPostFix has either - * key=484/value=val_484 or - * HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME - * so, at the end, "keyPrefix" doesnt have subdir information but "key" has - * Case #2: Dynamic partition with store-as-sub-dir. Assume dp part is hr - * spSpec has SP path - * fspKey has either - * hr=11/key=484/value=val_484 or - * hr=11/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME - * After filter, fspKey is hr=11, storedAsDirPostFix has either - * key=484/value=val_484 or - * HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME - * so, at the end, "keyPrefix" doesn't have subdir information from skewed but "key" has - * @param taskID - * @param spSpec - * @param fspKey - * @return - */ - private String createKeyForStatsPublisher(String taskID, String spSpec, String fspKey) { - String key; - String newFspKey = fspKey; - String storedAsDirPostFix = ""; - if (isSkewedStoredAsSubDirectories) { - List skewedValueDirList = this.lbCtx.getSkewedValuesDirNames(); - for (String dir : skewedValueDirList) { - newFspKey = newFspKey.replace(dir, ""); - if (!newFspKey.equals(fspKey)) { - storedAsDirPostFix = dir; - break; - } - } - } - String keyPrefix = Utilities.getHashedStatsPrefix( - conf.getStatsAggPrefix() + spSpec + newFspKey + Path.SEPARATOR, - conf.getMaxStatsKeyPrefixLength()); - key = keyPrefix + storedAsDirPostFix + taskID; - return key; - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java index 93dd387..2d40037 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.vector.expressions.ConstantVectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -48,31 +49,47 @@ } private final transient LongWritable filtered_count, passed_count; - private transient VectorExpression conditionEvaluator; + private VectorExpression conditionEvaluator = null; transient int heartbeatInterval; - private final VectorizationContext vContext; - public VectorFilterOperator(VectorizationContext ctxt, OperatorDesc conf) { + // filterMode is 1 if condition is always true, -1 if always false + // and 0 if condition needs to be computed. + transient private int filterMode = 0; + + public VectorFilterOperator(VectorizationContext vContext, OperatorDesc conf) + throws HiveException { + this(); + vContext.setOperatorType(OperatorType.FILTER); + ExprNodeDesc oldExpression = ((FilterDesc) conf).getPredicate(); + conditionEvaluator = vContext.getVectorExpression(oldExpression); + } + + public VectorFilterOperator() { super(); - this.vContext = ctxt; filtered_count = new LongWritable(); passed_count = new LongWritable(); this.conf = (FilterDesc) conf; } + @Override protected void initializeOp(Configuration hconf) throws HiveException { try { heartbeatInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVESENDHEARTBEAT); - ExprNodeDesc oldExpression = conf.getPredicate(); - vContext.setOperatorType(OperatorType.FILTER); - conditionEvaluator = vContext.getVectorExpression(oldExpression); statsMap.put(Counter.FILTERED, filtered_count); statsMap.put(Counter.PASSED, passed_count); } catch (Throwable e) { throw new HiveException(e); } + if (conditionEvaluator instanceof ConstantVectorExpression) { + ConstantVectorExpression cve = (ConstantVectorExpression) this.conditionEvaluator; + if (cve.getLongValue() == 1) { + filterMode = 1; + } else { + filterMode = -1; + } + } initializeChildren(hconf); } @@ -86,7 +103,18 @@ public void processOp(Object row, int tag) throws HiveException { VectorizedRowBatch vrg = (VectorizedRowBatch) row; //Evaluate the predicate expression //The selected vector represents selected rows. - conditionEvaluator.evaluate(vrg); + switch (filterMode) { + case 0: + conditionEvaluator.evaluate(vrg); + break; + case -1: + // All will be filtered out + vrg.size = 0; + break; + case 1: + default: + // All are selected, do nothing + } if (vrg.size > 0) { forward(vrg, null); } @@ -108,4 +136,12 @@ static public String getOperatorName() { public OperatorType getType() { return OperatorType.FILTER; } + + public VectorExpression getConditionEvaluator() { + return conditionEvaluator; + } + + public void setConditionEvaluator(VectorExpression conditionEvaluator) { + this.conditionEvaluator = conditionEvaluator; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index d29c1bb..c8f0825 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -57,21 +57,19 @@ private static final Log LOG = LogFactory.getLog( VectorGroupByOperator.class.getName()); - private final VectorizationContext vContext; - /** * This is the vector of aggregators. They are stateless and only implement * the algorithm of how to compute the aggregation. state is kept in the * aggregation buffers and is our responsibility to match the proper state for each key. */ - private transient VectorAggregateExpression[] aggregators; + private VectorAggregateExpression[] aggregators; /** * Key vector expressions. */ - private transient VectorExpression[] keyExpressions; + private VectorExpression[] keyExpressions; - private VectorExpressionWriter[] keyOutputWriters; + private transient VectorExpressionWriter[] keyOutputWriters; /** * The aggregation buffers to use for the current batch. @@ -141,10 +139,24 @@ private static final long serialVersionUID = 1L; - public VectorGroupByOperator(VectorizationContext ctxt, OperatorDesc conf) { + public VectorGroupByOperator(VectorizationContext vContext, OperatorDesc conf) + throws HiveException { + this(); + GroupByDesc desc = (GroupByDesc) conf; + this.conf = desc; + vContext.setOperatorType(OperatorType.GROUPBY); + List keysDesc = desc.getKeys(); + keyExpressions = vContext.getVectorExpressions(keysDesc); + ArrayList aggrDesc = desc.getAggregators(); + aggregators = new VectorAggregateExpression[aggrDesc.size()]; + for (int i = 0; i < aggrDesc.size(); ++i) { + AggregationDesc aggDesc = aggrDesc.get(i); + aggregators[i] = vContext.getAggregatorExpression(aggDesc); + } + } + + public VectorGroupByOperator() { super(); - this.vContext = ctxt; - this.conf = (GroupByDesc) conf; } @Override @@ -152,11 +164,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { List objectInspectors = new ArrayList(); + List keysDesc = conf.getKeys(); try { - vContext.setOperatorType(OperatorType.GROUPBY); - - List keysDesc = conf.getKeys(); - keyExpressions = vContext.getVectorExpressions(keysDesc); keyOutputWriters = new VectorExpressionWriter[keyExpressions.length]; @@ -166,11 +175,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { objectInspectors.add(keyOutputWriters[i].getObjectInspector()); } - ArrayList aggrDesc = conf.getAggregators(); - aggregators = new VectorAggregateExpression[aggrDesc.size()]; - for (int i = 0; i < aggrDesc.size(); ++i) { - AggregationDesc desc = aggrDesc.get(i); - aggregators[i] = vContext.getAggregatorExpression (desc); + for (int i = 0; i < aggregators.length; ++i) { + aggregators[i].init(conf.getAggregators().get(i)); objectInspectors.add(aggregators[i].getOutputObjectInspector()); } @@ -215,13 +221,15 @@ private void computeMemoryLimits() { maxHashTblMemory = (int)(maxMemory * memoryThreshold); - LOG.info(String.format("maxMemory:%dMb (%d * %f) fixSize:%d (key:%d agg:%d)", - maxHashTblMemory/1024/1024, - maxMemory/1024/1024, - memoryThreshold, - fixedHashEntrySize, - keyWrappersBatch.getKeysFixedSize(), - aggregationBatchInfo.getAggregatorsFixedSize())); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("maxMemory:%dMb (%d * %f) fixSize:%d (key:%d agg:%d)", + maxHashTblMemory/1024/1024, + maxMemory/1024/1024, + memoryThreshold, + fixedHashEntrySize, + keyWrappersBatch.getKeysFixedSize(), + aggregationBatchInfo.getAggregatorsFixedSize())); + } } @@ -264,15 +272,16 @@ private void flush(boolean all) throws HiveException { (int)(numEntriesHashTable * PERCENT_ENTRIES_TO_FLUSH); int entriesFlushed = 0; - LOG.info(String.format("Flush %d %s entries:%d fixed:%d variable:%d (used:%dMb max:%dMb)", - entriesToFlush, all ? "(all)" : "", - numEntriesHashTable, fixedHashEntrySize, avgVariableSize, - numEntriesHashTable * (fixedHashEntrySize + avgVariableSize)/1024/1024, - maxHashTblMemory/1024/1024)); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Flush %d %s entries:%d fixed:%d variable:%d (used:%dMb max:%dMb)", + entriesToFlush, all ? "(all)" : "", + numEntriesHashTable, fixedHashEntrySize, avgVariableSize, + numEntriesHashTable * (fixedHashEntrySize + avgVariableSize)/1024/1024, + maxHashTblMemory/1024/1024)); + } Object[] forwardCache = new Object[keyExpressions.length + aggregators.length]; if (keyExpressions.length == 0 && mapKeysAggregationBuffers.isEmpty()) { - // if this is a global aggregation (no keys) and empty set, must still emit NULLs VectorAggregationBufferRow emptyBuffers = allocateAggregationBuffer(); for (int i = 0; i < aggregators.length; ++i) { @@ -280,7 +289,6 @@ private void flush(boolean all) throws HiveException { } forward(forwardCache, outputObjInspector); } else { - /* Iterate the global (keywrapper,aggregationbuffers) map and emit a row for each key */ Iterator> iter = @@ -297,8 +305,10 @@ private void flush(boolean all) throws HiveException { forwardCache[fi++] = aggregators[i].evaluateOutput(pair.getValue() .getAggregationBuffer(i)); } - LOG.debug(String.format("forwarding keys: %s: %s", - pair.getKey().toString(), Arrays.toString(forwardCache))); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("forwarding keys: %s: %s", + pair.getKey().toString(), Arrays.toString(forwardCache))); + } forward(forwardCache, outputObjInspector); if (!all) { @@ -441,5 +451,21 @@ public OperatorType getType() { return OperatorType.GROUPBY; } + public VectorExpression[] getKeyExpressions() { + return keyExpressions; + } + + public void setKeyExpressions(VectorExpression[] keyExpressions) { + this.keyExpressions = keyExpressions; + } + + public VectorAggregateExpression[] getAggregators() { + return aggregators; + } + + public void setAggregators(VectorAggregateExpression[] aggregators) { + this.aggregators = aggregators; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java deleted file mode 100644 index d53bffc..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java +++ /dev/null @@ -1,775 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.vector; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; -import org.apache.hadoop.hive.ql.plan.MapredWork; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.hive.serde2.Deserializer; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeStats; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; - -/** - * Map operator. This triggers overall map side processing. This is a little - * different from regular operators in that it starts off by processing a - * Writable data structure from a Table (instead of a Hive Object). - **/ -public class VectorMapOperator extends Operator implements Serializable, Cloneable { - - private static final long serialVersionUID = 1L; - - /** - * Counter. - * - */ - public static enum Counter { - DESERIALIZE_ERRORS - } - - private final transient LongWritable deserialize_error_count = new LongWritable(); - private transient Deserializer deserializer; - - private transient Object[] rowWithPart; - private transient Writable[] vcValues; - private transient List vcs; - private transient Object[] rowWithPartAndVC; - private transient StructObjectInspector tblRowObjectInspector; - // convert from partition to table schema - private transient Converter partTblObjectInspectorConverter; - private transient boolean isPartitioned; - private Map opCtxMap; - private final Set listInputPaths = new HashSet(); - - private Map, ArrayList> operatorToPaths; - - private final Map, MapOpCtx> childrenOpToOpCtxMap = - new HashMap, MapOpCtx>(); - - private ArrayList> extraChildrenToClose = null; - private VectorizationContext vectorizationContext = null; - private boolean outputColumnsInitialized = false;; - - private static class MapInputPath { - String path; - String alias; - Operator op; - - /** - * @param path - * @param alias - * @param op - */ - public MapInputPath(String path, String alias, - Operator op) { - this.path = path; - this.alias = alias; - this.op = op; - } - - @Override - public boolean equals(Object o) { - if (o instanceof MapInputPath) { - MapInputPath mObj = (MapInputPath) o; - return path.equals(mObj.path) && alias.equals(mObj.alias) - && op.equals(mObj.op); - } - - return false; - } - - @Override - public int hashCode() { - int ret = (path == null) ? 0 : path.hashCode(); - ret += (alias == null) ? 0 : alias.hashCode(); - ret += (op == null) ? 0 : op.hashCode(); - return ret; - } - - public Operator getOp() { - return op; - } - - public void setOp(Operator op) { - this.op = op; - } - - } - - private static class MapOpCtx { - private final boolean isPartitioned; - private final StructObjectInspector tblRawRowObjectInspector; // without partition - private final StructObjectInspector partObjectInspector; // partition - private final StructObjectInspector rowObjectInspector; - private final Converter partTblObjectInspectorConverter; - private final Object[] rowWithPart; - private final Object[] rowWithPartAndVC; - private final Deserializer deserializer; - private String tableName; - private String partName; - - /** - * @param isPartitioned - * @param rowObjectInspector - * @param rowWithPart - */ - public MapOpCtx(boolean isPartitioned, - StructObjectInspector rowObjectInspector, - StructObjectInspector tblRawRowObjectInspector, - StructObjectInspector partObjectInspector, - Object[] rowWithPart, - Object[] rowWithPartAndVC, - Deserializer deserializer, - Converter partTblObjectInspectorConverter) { - this.isPartitioned = isPartitioned; - this.rowObjectInspector = rowObjectInspector; - this.tblRawRowObjectInspector = tblRawRowObjectInspector; - this.partObjectInspector = partObjectInspector; - this.rowWithPart = rowWithPart; - this.rowWithPartAndVC = rowWithPartAndVC; - this.deserializer = deserializer; - this.partTblObjectInspectorConverter = partTblObjectInspectorConverter; - } - - /** - * @return the isPartitioned - */ - public boolean isPartitioned() { - return isPartitioned; - } - - /** - * @return the rowObjectInspector - */ - public StructObjectInspector getRowObjectInspector() { - return rowObjectInspector; - } - - public StructObjectInspector getTblRawRowObjectInspector() { - return tblRawRowObjectInspector; - } - - /** - * @return the rowWithPart - */ - public Object[] getRowWithPart() { - return rowWithPart; - } - - /** - * @return the rowWithPartAndVC - */ - public Object[] getRowWithPartAndVC() { - return rowWithPartAndVC; - } - - /** - * @return the deserializer - */ - public Deserializer getDeserializer() { - return deserializer; - } - - public Converter getPartTblObjectInspectorConverter() { - return partTblObjectInspectorConverter; - } - } - - /** - * Initializes this map op as the root of the tree. It sets JobConf & - * MapRedWork and starts initialization of the operator tree rooted at this - * op. - * - * @param hconf - * @param mrwork - * @throws HiveException - */ - public void initializeAsRoot(Configuration hconf, MapredWork mrwork) - throws HiveException { - setConf(mrwork); - setChildren(hconf); - initialize(hconf, null); - } - - private MapOpCtx initObjectInspector(MapredWork conf, - Configuration hconf, String onefile, Map convertedOI) - throws HiveException, - ClassNotFoundException, InstantiationException, IllegalAccessException, - SerDeException { - PartitionDesc pd = conf.getMapWork().getPathToPartitionInfo().get(onefile); - LinkedHashMap partSpec = pd.getPartSpec(); - // Use tblProps in case of unpartitioned tables - Properties partProps = - (pd.getPartSpec() == null || pd.getPartSpec().isEmpty()) ? - pd.getTableDesc().getProperties() : pd.getProperties(); - - Class serdeclass = pd.getDeserializerClass(); - if (serdeclass == null) { - String className = pd.getSerdeClassName(); - if ((className == null) || (className.isEmpty())) { - throw new HiveException( - "SerDe class or the SerDe class name is not set for table: " - + pd.getProperties().getProperty("name")); - } - serdeclass = hconf.getClassByName(className); - } - - String tableName = String.valueOf(partProps.getProperty("name")); - String partName = String.valueOf(partSpec); - Deserializer partDeserializer = (Deserializer) serdeclass.newInstance(); - partDeserializer.initialize(hconf, partProps); - StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) partDeserializer - .getObjectInspector(); - - StructObjectInspector tblRawRowObjectInspector = convertedOI.get(pd.getTableDesc()); - - partTblObjectInspectorConverter = - ObjectInspectorConverters.getConverter(partRawRowObjectInspector, - tblRawRowObjectInspector); - - MapOpCtx opCtx = null; - // Next check if this table has partitions and if so - // get the list of partition names as well as allocate - // the serdes for the partition columns - String pcols = partProps - .getProperty(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); - // Log LOG = LogFactory.getLog(MapOperator.class.getName()); - if (pcols != null && pcols.length() > 0) { - String[] partKeys = pcols.trim().split("/"); - List partNames = new ArrayList(partKeys.length); - Object[] partValues = new Object[partKeys.length]; - List partObjectInspectors = new ArrayList( - partKeys.length); - for (int i = 0; i < partKeys.length; i++) { - String key = partKeys[i]; - partNames.add(key); - // Partitions do not exist for this table - if (partSpec == null) { - // for partitionless table, initialize partValue to null - partValues[i] = null; - } else { - partValues[i] = new Text(partSpec.get(key)); - } - partObjectInspectors - .add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); - } - StructObjectInspector partObjectInspector = ObjectInspectorFactory - .getStandardStructObjectInspector(partNames, partObjectInspectors); - - Object[] rowWithPart = new Object[2]; - rowWithPart[1] = partValues; - StructObjectInspector rowObjectInspector = ObjectInspectorFactory - .getUnionStructObjectInspector(Arrays - .asList(new StructObjectInspector[] {tblRawRowObjectInspector, partObjectInspector})); - // LOG.info("dump " + tableName + " " + partName + " " + - // rowObjectInspector.getTypeName()); - opCtx = new MapOpCtx(true, rowObjectInspector, tblRawRowObjectInspector, partObjectInspector, - rowWithPart, null, partDeserializer, partTblObjectInspectorConverter); - } else { - // LOG.info("dump2 " + tableName + " " + partName + " " + - // rowObjectInspector.getTypeName()); - opCtx = new MapOpCtx(false, tblRawRowObjectInspector, tblRawRowObjectInspector, null, null, - null, partDeserializer, partTblObjectInspectorConverter); - } - opCtx.tableName = tableName; - opCtx.partName = partName; - return opCtx; - } - - // Return the mapping for table descriptor to the expected table OI - /** - * Traverse all the partitions for a table, and get the OI for the table. - * Note that a conversion is required if any of the partition OI is different - * from the table OI. For eg. if the query references table T (partitions P1, P2), - * and P1's schema is same as T, whereas P2's scheme is different from T, conversion - * might be needed for both P1 and P2, since SettableOI might be needed for T - */ - private Map getConvertedOI(Configuration hconf) - throws HiveException { - Map tableDescOI = - new HashMap(); - Set identityConverterTableDesc = new HashSet(); - try - { - for (String onefile : conf.getMapWork().getPathToAliases().keySet()) { - PartitionDesc pd = conf.getMapWork().getPathToPartitionInfo().get(onefile); - TableDesc tableDesc = pd.getTableDesc(); - Properties tblProps = tableDesc.getProperties(); - // If the partition does not exist, use table properties - Properties partProps = - (pd.getPartSpec() == null || pd.getPartSpec().isEmpty()) ? - tblProps : pd.getProperties(); - - Class sdclass = pd.getDeserializerClass(); - if (sdclass == null) { - String className = pd.getSerdeClassName(); - if ((className == null) || (className.isEmpty())) { - throw new HiveException( - "SerDe class or the SerDe class name is not set for table: " - + pd.getProperties().getProperty("name")); - } - sdclass = hconf.getClassByName(className); - } - - Deserializer partDeserializer = (Deserializer) sdclass.newInstance(); - partDeserializer.initialize(hconf, partProps); - StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) partDeserializer - .getObjectInspector(); - - StructObjectInspector tblRawRowObjectInspector = tableDescOI.get(tableDesc); - if ((tblRawRowObjectInspector == null) || - (identityConverterTableDesc.contains(tableDesc))) { - sdclass = tableDesc.getDeserializerClass(); - if (sdclass == null) { - String className = tableDesc.getSerdeClassName(); - if ((className == null) || (className.isEmpty())) { - throw new HiveException( - "SerDe class or the SerDe class name is not set for table: " - + tableDesc.getProperties().getProperty("name")); - } - sdclass = hconf.getClassByName(className); - } - Deserializer tblDeserializer = (Deserializer) sdclass.newInstance(); - tblDeserializer.initialize(hconf, tblProps); - tblRawRowObjectInspector = - (StructObjectInspector) ObjectInspectorConverters.getConvertedOI( - partRawRowObjectInspector, - (StructObjectInspector) tblDeserializer.getObjectInspector()); - - if (identityConverterTableDesc.contains(tableDesc)) { - if (!partRawRowObjectInspector.equals(tblRawRowObjectInspector)) { - identityConverterTableDesc.remove(tableDesc); - } - } - else if (partRawRowObjectInspector.equals(tblRawRowObjectInspector)) { - identityConverterTableDesc.add(tableDesc); - } - - tableDescOI.put(tableDesc, tblRawRowObjectInspector); - } - } - } catch (Exception e) { - throw new HiveException(e); - } - return tableDescOI; - } - - public void setChildren(Configuration hconf) throws HiveException { - - Path fpath = new Path((new Path(HiveConf.getVar(hconf, - HiveConf.ConfVars.HADOOPMAPFILENAME))).toUri().getPath()); - - ArrayList> children = - new ArrayList>(); - opCtxMap = new HashMap(); - operatorToPaths = new HashMap, ArrayList>(); - - statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count); - Map convertedOI = getConvertedOI(hconf); - Map> aliasToVectorOpMap = - new HashMap>(); - - try { - for (String onefile : conf.getMapWork().getPathToAliases().keySet()) { - MapOpCtx opCtx = initObjectInspector(conf, hconf, onefile, convertedOI); - //Create columnMap - Map columnMap = new HashMap(); - StructObjectInspector rowInspector = opCtx.getRowObjectInspector(); - - int columnCount = 0; - for (StructField sfield : rowInspector.getAllStructFieldRefs()) { - columnMap.put(sfield.getFieldName(), columnCount); - System.out.println("Column Name: " + sfield.getFieldName() + ", " + - "column index: " + columnCount); - LOG.info("Column Name: " + sfield.getFieldName() + ", " + - "column index: " + columnCount); - columnCount++; - } - - Path onepath = new Path(new Path(onefile).toUri().getPath()); - List aliases = conf.getMapWork().getPathToAliases().get(onefile); - - vectorizationContext = new VectorizationContext(columnMap, columnCount); - - for (String onealias : aliases) { - Operator op = conf.getMapWork().getAliasToWork().get( - onealias); - LOG.info("Adding alias " + onealias + " to work list for file " - + onefile); - - Operator vectorOp = aliasToVectorOpMap.get(onealias); - - if (vectorOp == null) { - vectorOp = vectorizeOperator(op, vectorizationContext); - aliasToVectorOpMap.put(onealias, vectorOp); - } - - System.out.println("Using vectorized op: "+ vectorOp.getName()); - LOG.info("Using vectorized op: " + vectorOp.getName()); - op = vectorOp; - MapInputPath inp = new MapInputPath(onefile, onealias, op); - opCtxMap.put(inp, opCtx); - if (operatorToPaths.get(op) == null) { - operatorToPaths.put(op, new ArrayList()); - } - operatorToPaths.get(op).add(onefile); - op.setParentOperators(new ArrayList>()); - op.getParentOperators().add(this); - // check for the operators who will process rows coming to this Map - // Operator - if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) { - children.add(op); - childrenOpToOpCtxMap.put(op, opCtx); - LOG.info("dump " + op.getName() + " " - + opCtxMap.get(inp).getRowObjectInspector().getTypeName()); - } - } - } - - if (children.size() == 0) { - // didn't find match for input file path in configuration! - // serious problem .. - LOG.error("Configuration does not have any alias for path: " - + fpath.toUri().getPath()); - throw new HiveException("Configuration and input path are inconsistent"); - } - - // we found all the operators that we are supposed to process. - setChildOperators(children); - } catch (Exception e) { - throw new HiveException(e); - } - } - - public static Operator vectorizeOperator - (Operator op, VectorizationContext - vectorizationContext) throws HiveException, CloneNotSupportedException { - - Operator vectorOp; - boolean recursive = true; - - switch (op.getType()) { - case GROUPBY: - vectorOp = new VectorGroupByOperator(vectorizationContext, op.getConf()); - recursive = false; - break; - case FILTER: - vectorOp = new VectorFilterOperator(vectorizationContext, op.getConf()); - break; - case SELECT: - vectorOp = new VectorSelectOperator(vectorizationContext, op.getConf()); - break; - case FILESINK: - vectorOp = new VectorFileSinkOperator(vectorizationContext, op.getConf()); - break; - case TABLESCAN: - vectorOp = op.cloneOp(); - break; - case REDUCESINK: - vectorOp = new VectorReduceSinkOperator(vectorizationContext, op.getConf()); - break; - default: - throw new HiveException("Operator: " + op.getName() + ", " + - "not vectorized"); - } - - if (recursive) { - List> children = op.getChildOperators(); - if (children != null && !children.isEmpty()) { - List> vectorizedChildren = new - ArrayList>(children.size()); - for (Operator childOp : children) { - Operator vectorizedChild = - vectorizeOperator(childOp, vectorizationContext); - List> parentList = - new ArrayList>(); - parentList.add(vectorOp); - vectorizedChild.setParentOperators(parentList); - vectorizedChildren.add(vectorizedChild); - } - vectorOp.setChildOperators(vectorizedChildren); - } - } else { - // transfer the row-mode children to the vectorized op parent - List> children = - new ArrayList>(); - - if (op.getChildOperators() != null && !op.getChildOperators().isEmpty()) { - List> parentList = - new ArrayList>(); - parentList.add(vectorOp); - for (Operator childOp : op.getChildOperators()) { - Operator clonedOp = childOp.cloneRecursiveChildren(); - clonedOp.setParentOperators(parentList); - children.add(clonedOp); - } - vectorOp.setChildOperators(children); - } - } - - return vectorOp; - } - - private void plugIntermediate (Operator parent, - Operator plug) { - - List> plugList = - new ArrayList>(); - plugList.add(plug); - - List> parentAsList = - new ArrayList>(); - parentAsList.add(parent); - - - List> children = parent.getChildOperators(); - if (children != null && !children.isEmpty()) { - for (Operator childOp : children) { - childOp.setParentOperators(plugList); - } - } - plug.setChildOperators(children); - plug.setParentOperators(parentAsList); - parent.setChildOperators (plugList); - } - - @Override - public void initializeOp(Configuration hconf) throws HiveException { - // set that parent initialization is done and call initialize on children - state = State.INIT; - List> children = getChildOperators(); - - for (Entry, MapOpCtx> entry : childrenOpToOpCtxMap - .entrySet()) { - Operator child = entry.getKey(); - MapOpCtx mapOpCtx = entry.getValue(); - // Add alias, table name, and partitions to hadoop conf so that their - // children will - // inherit these - HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, - mapOpCtx.tableName); - HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, - mapOpCtx.partName); - child.initialize(hconf, new ObjectInspector[] {mapOpCtx.getRowObjectInspector()}); - } - - for (Entry entry : opCtxMap.entrySet()) { - // Add alias, table name, and partitions to hadoop conf so that their - // children will - // inherit these - HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, - entry.getValue().tableName); - HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, entry - .getValue().partName); - MapInputPath input = entry.getKey(); - Operator op = input.op; - // op is not in the children list, so need to remember it and close it - // afterwards - if (children.indexOf(op) == -1) { - if (extraChildrenToClose == null) { - extraChildrenToClose = new ArrayList>(); - } - extraChildrenToClose.add(op); - op.initialize(hconf, new ObjectInspector[] {entry.getValue().getRowObjectInspector()}); - } - } - } - - /** - * close extra child operators that are initialized but are not executed. - */ - @Override - public void closeOp(boolean abort) throws HiveException { - if (extraChildrenToClose != null) { - for (Operator op : extraChildrenToClose) { - op.close(abort); - } - } - } - - // Change the serializer etc. since it is a new file, and split can span - // multiple files/partitions. - @Override - public void cleanUpInputFileChangedOp() throws HiveException { - Path fpath = new Path((new Path(this.getExecContext().getCurrentInputFile())) - .toUri().getPath()); - - for (String onefile : conf.getMapWork().getPathToAliases().keySet()) { - Path onepath = new Path(new Path(onefile).toUri().getPath()); - // check for the operators who will process rows coming to this Map - // Operator - if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) { - String onealias = conf.getMapWork().getPathToAliases().get(onefile).get(0); - Operator op = - conf.getMapWork().getAliasToWork().get(onealias); - - LOG.info("Processing alias " + onealias + " for file " + onefile); - - MapInputPath inp = new MapInputPath(onefile, onealias, op); - //setInspectorInput(inp); - break; - } - } - } - - public static Writable[] populateVirtualColumnValues(ExecMapperContext ctx, - List vcs, Writable[] vcValues, Deserializer deserializer) { - if (vcs == null) { - return vcValues; - } - if (vcValues == null) { - vcValues = new Writable[vcs.size()]; - } - for (int i = 0; i < vcs.size(); i++) { - VirtualColumn vc = vcs.get(i); - if (vc.equals(VirtualColumn.FILENAME)) { - if (ctx.inputFileChanged()) { - vcValues[i] = new Text(ctx.getCurrentInputFile()); - } - } else if (vc.equals(VirtualColumn.BLOCKOFFSET)) { - long current = ctx.getIoCxt().getCurrentBlockStart(); - LongWritable old = (LongWritable) vcValues[i]; - if (old == null) { - old = new LongWritable(current); - vcValues[i] = old; - continue; - } - if (current != old.get()) { - old.set(current); - } - } else if (vc.equals(VirtualColumn.ROWOFFSET)) { - long current = ctx.getIoCxt().getCurrentRow(); - LongWritable old = (LongWritable) vcValues[i]; - if (old == null) { - old = new LongWritable(current); - vcValues[i] = old; - continue; - } - if (current != old.get()) { - old.set(current); - } - } else if (vc.equals(VirtualColumn.RAWDATASIZE)) { - long current = 0L; - SerDeStats stats = deserializer.getSerDeStats(); - if(stats != null) { - current = stats.getRawDataSize(); - } - LongWritable old = (LongWritable) vcValues[i]; - if (old == null) { - old = new LongWritable(current); - vcValues[i] = old; - continue; - } - if (current != old.get()) { - old.set(current); - } - } - } - return vcValues; - } - - public void process(Object value) throws HiveException { - // A mapper can span multiple files/partitions. - // The serializers need to be reset if the input file changed - if ((this.getExecContext() != null) && - this.getExecContext().inputFileChanged()) { - // The child operators cleanup if input file has changed - cleanUpInputFileChanged(); - } - - // The row has been converted to comply with table schema, irrespective of partition schema. - // So, use tblOI (and not partOI) for forwarding - try { - if (value instanceof VectorizedRowBatch) { - if (!outputColumnsInitialized ) { - VectorizedRowBatch vrg = (VectorizedRowBatch) value; - Map outputColumnTypes = - vectorizationContext.getOutputColumnTypeMap(); - if (!outputColumnTypes.isEmpty()) { - int origNumCols = vrg.numCols; - int newNumCols = vrg.cols.length+outputColumnTypes.keySet().size(); - vrg.cols = Arrays.copyOf(vrg.cols, newNumCols); - for (int i = origNumCols; i < newNumCols; i++) { - vrg.cols[i] = vectorizationContext.allocateColumnVector(outputColumnTypes.get(i), - VectorizedRowBatch.DEFAULT_SIZE); - } - } - outputColumnsInitialized = true; - } - forward(value, null); - } else { - Object row = null; - row = this.partTblObjectInspectorConverter.convert(deserializer.deserialize((Writable) value)); - forward(row, tblRowObjectInspector); - } - } catch (Exception e) { - throw new HiveException("Hive Runtime Error while processing ", e); - } - } - - @Override - public void processOp(Object row, int tag) throws HiveException { - throw new HiveException("Hive 2 Internal error: should not be called!"); - } - - @Override - public String getName() { - return getOperatorName(); - } - - static public String getOperatorName() { - return "MAP"; - } - - @Override - public OperatorType getType() { - return null; - } - -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java index d177e29..33711c6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java @@ -52,26 +52,24 @@ private static final long serialVersionUID = 1L; - private final VectorizationContext vContext; - /** * The evaluators for the key columns. Key columns decide the sort order on * the reducer side. Key columns are passed to the reducer in the "key". */ - protected transient VectorExpression[] keyEval; - + protected VectorExpression[] keyEval; + /** * The key value writers. These know how to write the necessary writable type * based on key column metadata, from the primitive vector type. */ protected transient VectorExpressionWriter[] keyWriters; - + /** * The evaluators for the value columns. Value columns are passed to reducer * in the "value". */ - protected transient VectorExpression[] valueEval; - + protected VectorExpression[] valueEval; + /** * The output value writers. These know how to write the necessary writable type * based on value column metadata, from the primitive vector type. @@ -83,19 +81,19 @@ * Hive language). Partition columns decide the reducer that the current row * goes to. Partition columns are not passed to reducer. */ - protected transient VectorExpression[] partitionEval; - + protected VectorExpression[] partitionEval; + /** * The partition value writers. These know how to write the necessary writable type * based on partition column metadata, from the primitive vector type. - */ + */ protected transient VectorExpressionWriter[] partitionWriters; - private int numDistributionKeys; + private transient int numDistributionKeys; - private List> distinctColIndices; + private transient List> distinctColIndices; - private int numDistinctExprs; + private transient int numDistinctExprs; transient HiveKey keyWritable = new HiveKey(); transient Writable value; @@ -115,14 +113,24 @@ transient ObjectInspector[] partitionObjectInspectors; transient int [] keyHashCode = new int [VectorizedRowBatch.DEFAULT_SIZE]; + public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf) + throws HiveException { + this(); + ReduceSinkDesc desc = (ReduceSinkDesc) conf; + this.conf = desc; + vContext.setOperatorType(OperatorType.REDUCESINK); + keyEval = vContext.getVectorExpressions(desc.getKeyCols()); + valueEval = vContext.getVectorExpressions(desc.getValueCols()); + partitionEval = vContext.getVectorExpressions(desc.getPartitionCols()); + } + + public VectorReduceSinkOperator() { + super(); + } @Override protected void initializeOp(Configuration hconf) throws HiveException { try { - vContext.setOperatorType(OperatorType.REDUCESINK); - keyEval = vContext.getVectorExpressions(conf.getKeyCols()); - valueEval = vContext.getVectorExpressions(conf.getValueCols()); - partitionEval = vContext.getVectorExpressions(conf.getPartitionCols()); numDistributionKeys = conf.getNumDistributionKeys(); distinctColIndices = conf.getDistinctColumnIndices(); @@ -133,12 +141,12 @@ protected void initializeOp(Configuration hconf) throws HiveException { .newInstance(); keySerializer.initialize(null, keyTableDesc.getProperties()); keyIsText = keySerializer.getSerializedClass().equals(Text.class); - + /* - * Compute and assign the key writers and the key object inspector + * Compute and assign the key writers and the key object inspector */ VectorExpressionWriterFactory.processVectorExpressions( - conf.getKeyCols(), + conf.getKeyCols(), conf.getOutputKeyColumnNames(), new VectorExpressionWriterFactory.Closure() { @Override @@ -148,7 +156,7 @@ public void assign(VectorExpressionWriter[] writers, keyObjectInspector = objectInspector; } }); - + String colNames = ""; for(String colName : conf.getOutputKeyColumnNames()) { colNames = String.format("%s %s", colNames, colName); @@ -160,12 +168,12 @@ public void assign(VectorExpressionWriter[] writers, colNames)); partitionWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getPartitionCols()); - + TableDesc valueTableDesc = conf.getValueSerializeInfo(); valueSerializer = (Serializer) valueTableDesc.getDeserializerClass() .newInstance(); valueSerializer.initialize(null, valueTableDesc.getProperties()); - + /* * Compute and assign the value writers and the value object inspector */ @@ -323,13 +331,6 @@ public void processOp(Object row, int tag) throws HiveException { } } - public VectorReduceSinkOperator ( - VectorizationContext context, - OperatorDesc conf) { - this.vContext = context; - this.conf = (ReduceSinkDesc) conf; - } - /** * @return the name of the operator */ @@ -352,4 +353,28 @@ public boolean opAllowedBeforeMapJoin() { return false; } + public VectorExpression[] getPartitionEval() { + return partitionEval; + } + + public void setPartitionEval(VectorExpression[] partitionEval) { + this.partitionEval = partitionEval; + } + + public VectorExpression[] getValueEval() { + return valueEval; + } + + public void setValueEval(VectorExpression[] valueEval) { + this.valueEval = valueEval; + } + + public VectorExpression[] getKeyEval() { + return keyEval; + } + + public void setKeyEval(VectorExpression[] keyEval) { + this.keyEval = keyEval; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java index bc88bad..7b3ee02 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; @@ -43,17 +44,27 @@ private static final long serialVersionUID = 1L; - protected transient VectorExpression[] vExpressions; + protected VectorExpression[] vExpressions = null; - private final VectorizationContext vContext; + private transient int [] projectedColumns = null; - private int [] projectedColumns = null; + private transient VectorExpressionWriter [] valueWriters = null; - private VectorExpressionWriter [] valueWriters = null; - - public VectorSelectOperator(VectorizationContext ctxt, OperatorDesc conf) { - this.vContext = ctxt; + public VectorSelectOperator(VectorizationContext vContext, OperatorDesc conf) + throws HiveException { this.conf = (SelectDesc) conf; + List colList = this.conf.getColList(); + vContext.setOperatorType(OperatorType.SELECT); + vExpressions = new VectorExpression[colList.size()]; + for (int i = 0; i < colList.size(); i++) { + vExpressions[i] = vContext.getVectorExpression(colList.get(i)); + String columnName = this.conf.getOutputColumnNames().get(i); + // Update column map with output column names + vContext.addToColumnMap(columnName, vExpressions[i].getOutputColumn()); + } + } + + public VectorSelectOperator() { } @Override @@ -67,14 +78,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { List objectInspectors = new ArrayList(); List colList = conf.getColList(); - vContext.setOperatorType(OperatorType.SELECT); - vExpressions = new VectorExpression[colList.size()]; - for (int i = 0; i < colList.size(); i++) { - vExpressions[i] = vContext.getVectorExpression(colList.get(i)); - String columnName = conf.getOutputColumnNames().get(i); - // Update column map with output column names - vContext.addToColumnMap(columnName, vExpressions[i].getOutputColumn()); - } valueWriters = VectorExpressionWriterFactory.getExpressionWriters(colList); for (VectorExpressionWriter vew : valueWriters) { objectInspectors.add(vew.getObjectInspector()); @@ -141,4 +144,21 @@ static public String getOperatorName() { public OperatorType getType() { return OperatorType.SELECT; } + + @Explain (displayName = "vector expressions") + public VectorExpression[] getvExpressions() { + return vExpressions; + } + + public VectorExpression[] getVExpressions() { + return vExpressions; + } + + public void setvExpressions(VectorExpression[] vExpressions) { + this.vExpressions = vExpressions; + } + + public void setVExpressions(VectorExpression[] vExpressions) { + this.vExpressions = vExpressions; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java index 9c90230..3d8ade6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.hive.ql.exec.vector.expressions.ConstantVectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterConstantBooleanVectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterExprAndExpr; import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterExprOrExpr; import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterStringColLikeStringScalar; @@ -157,7 +158,8 @@ private int getInputColumnIndex(String name) { private final Set usedOutputColumns = new HashSet(); int allocateOutputColumn(String columnType) { - return initialOutputCol + allocateOutputColumnInternal(columnType); + int relativeCol = allocateOutputColumnInternal(columnType); + return initialOutputCol + relativeCol; } private int allocateOutputColumnInternal(String columnType) { @@ -192,14 +194,6 @@ void freeOutputColumn(int index) { usedOutputColumns.remove(index-initialOutputCol); } } - - String getOutputColumnType(int index) { - return outputColumnsTypes[index-initialOutputCol]; - } - - int getNumOfOutputColumn() { - return outputColCount; - } } public void setOperatorType(OperatorType opType) { @@ -311,8 +305,22 @@ private VectorExpression getConstantVectorExpression(ExprNodeConstantDesc exprDe return new ConstantVectorExpression(outCol, ((Number) exprDesc.getValue()).doubleValue()); } else if (type.equalsIgnoreCase("string")) { return new ConstantVectorExpression(outCol, ((String) exprDesc.getValue()).getBytes()); + } else if (type.equalsIgnoreCase("boolean")) { + if (this.opType == OperatorType.FILTER) { + if (((Boolean) exprDesc.getValue()).booleanValue()) { + return new FilterConstantBooleanVectorExpression(1); + } else { + return new FilterConstantBooleanVectorExpression(0); + } + } else { + if (((Boolean) exprDesc.getValue()).booleanValue()) { + return new ConstantVectorExpression(outCol, 1); + } else { + return new ConstantVectorExpression(outCol, 0); + } + } } else { - throw new HiveException("Unsupported constant type"); + throw new HiveException("Unsupported constant type: "+type.toString()); } } @@ -339,8 +347,7 @@ private VectorExpression getUnaryMinusExpression(List childExprLis + outputColumnType + "ColUnaryMinus"; VectorExpression expr; try { - expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(inputCol, outputCol); + expr = (VectorExpression) getConstructor(className).newInstance(inputCol, outputCol); } catch (Exception ex) { throw new HiveException(ex); } @@ -470,14 +477,14 @@ private VectorExpression getVectorExpression(GenericUDFBridge udf, /* Return a unary string vector expression. This is used for functions like * UPPER() and LOWER(). */ - private VectorExpression getUnaryStringExpression(String vectorExprClassName, + private VectorExpression getUnaryStringExpression(String vectorExprClassName, String resultType, // result type name List childExprList) throws HiveException { - + /* Create an instance of the class vectorExprClassName for the input column or expression result * and return it. */ - + ExprNodeDesc childExpr = childExprList.get(0); int inputCol; VectorExpression v1 = null; @@ -497,8 +504,7 @@ private VectorExpression getUnaryStringExpression(String vectorExprClassName, + vectorExprClassName; VectorExpression expr; try { - expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(inputCol, outputCol); + expr = (VectorExpression) getConstructor(className).newInstance(inputCol, outputCol); } catch (Exception ex) { throw new HiveException(ex); } @@ -517,23 +523,23 @@ private VectorExpression getLikeExpression(List childExpr) throws VectorExpression expr = null; int inputCol; ExprNodeConstantDesc constDesc; - + if ((leftExpr instanceof ExprNodeColumnDesc) && (rightExpr instanceof ExprNodeConstantDesc) ) { ExprNodeColumnDesc leftColDesc = (ExprNodeColumnDesc) leftExpr; constDesc = (ExprNodeConstantDesc) rightExpr; inputCol = getInputColumnIndex(leftColDesc.getColumn()); - expr = (VectorExpression) new FilterStringColLikeStringScalar(inputCol, - new Text((byte[]) getScalarValue(constDesc))); + expr = (VectorExpression) new FilterStringColLikeStringScalar(inputCol, + new Text((byte[]) getScalarValue(constDesc))); } else if ((leftExpr instanceof ExprNodeGenericFuncDesc) && (rightExpr instanceof ExprNodeConstantDesc)) { v1 = getVectorExpression(leftExpr); inputCol = v1.getOutputColumn(); constDesc = (ExprNodeConstantDesc) rightExpr; - expr = (VectorExpression) new FilterStringColLikeStringScalar(inputCol, - new Text((byte[]) getScalarValue(constDesc))); + expr = (VectorExpression) new FilterStringColLikeStringScalar(inputCol, + new Text((byte[]) getScalarValue(constDesc))); } - // TODO add logic to handle cases where left input is an expression. + // TODO add logic to handle cases where left input is an expression. if (expr == null) { throw new HiveException("Vector LIKE filter expression could not be initialized"); } @@ -558,8 +564,8 @@ private VectorExpression getTimestampFieldExpression(String udf, // org.apache.hadoop.hive.ql.exec.vector.expressions.VectorUDFYearLong String vectorUDF = pkg + ".Vector"+udf+"Long"; try { - VectorExpression v2 = (VectorExpression)Class.forName(vectorUDF). - getDeclaredConstructors()[0].newInstance(inputCol,outputCol); + VectorExpression v2 = (VectorExpression) getConstructor(vectorUDF). + newInstance(inputCol,outputCol); return v2; } catch(Exception e) { e.printStackTrace(); @@ -594,8 +600,7 @@ private VectorExpression getBinaryArithmeticExpression(String method, int outputCol = ocm.allocateOutputColumn(getOutputColType(colType, scalarType, method)); try { - expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(inputCol, + expr = (VectorExpression) getConstructor(className).newInstance(inputCol, getScalarValue(constDesc), outputCol); } catch (Exception ex) { throw new HiveException(ex); @@ -612,8 +617,7 @@ private VectorExpression getBinaryArithmeticExpression(String method, String outputColType = getOutputColType(colType, scalarType, method); int outputCol = ocm.allocateOutputColumn(outputColType); try { - expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(getScalarValue(constDesc), + expr = (VectorExpression) getConstructor(className).newInstance(getScalarValue(constDesc), inputCol, outputCol); } catch (Exception ex) { throw new HiveException("Could not instantiate: "+className, ex); @@ -631,8 +635,7 @@ private VectorExpression getBinaryArithmeticExpression(String method, colType2, method); int outputCol = ocm.allocateOutputColumn(outputColType); try { - expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2, + expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2, outputCol); } catch (Exception ex) { throw new HiveException(ex); @@ -650,8 +653,7 @@ private VectorExpression getBinaryArithmeticExpression(String method, colType2, method); int outputCol = ocm.allocateOutputColumn(outputColType); try { - expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2, + expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2, outputCol); } catch (Exception ex) { throw new HiveException((ex)); @@ -669,8 +671,7 @@ private VectorExpression getBinaryArithmeticExpression(String method, String className = getBinaryColumnScalarExpressionClassName(colType1, scalarType, method); try { - expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(inputCol1, + expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, getScalarValue(constDesc), outputCol); } catch (Exception ex) { throw new HiveException((ex)); @@ -689,8 +690,7 @@ private VectorExpression getBinaryArithmeticExpression(String method, String className = getBinaryColumnColumnExpressionClassName(colType1, colType2, method); try { - expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2, + expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2, outputCol); } catch (Exception ex) { throw new HiveException(ex); @@ -708,8 +708,7 @@ private VectorExpression getBinaryArithmeticExpression(String method, String className = getBinaryScalarColumnExpressionClassName(colType2, scalarType, method); try { - expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(getScalarValue(constDesc), + expr = (VectorExpression) getConstructor(className).newInstance(getScalarValue(constDesc), inputCol2, outputCol); } catch (Exception ex) { throw new HiveException(ex); @@ -730,8 +729,7 @@ private VectorExpression getBinaryArithmeticExpression(String method, String className = getBinaryColumnColumnExpressionClassName(colType1, colType2, method); try { - expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2, + expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2, outputCol); } catch (Exception ex) { throw new HiveException(ex); @@ -864,13 +862,13 @@ private VectorExpression getVectorBinaryComparisonFilterExpression(String String className = getFilterColumnScalarExpressionClassName(colType, scalarType, opName); try { - expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(inputCol, + Constructor ctor = getConstructor(className); + expr = (VectorExpression) ctor.newInstance(inputCol, getScalarValue(constDesc)); } catch (Exception ex) { throw new HiveException(ex); } - } else if ((leftExpr instanceof ExprNodeConstantDesc) && + } else if ((leftExpr instanceof ExprNodeConstantDesc) && (rightExpr instanceof ExprNodeColumnDesc)) { ExprNodeConstantDesc constDesc = (ExprNodeConstantDesc) leftExpr; ExprNodeColumnDesc rightColDesc = (ExprNodeColumnDesc) rightExpr; @@ -880,8 +878,8 @@ private VectorExpression getVectorBinaryComparisonFilterExpression(String String className = getFilterScalarColumnExpressionClassName(colType, scalarType, opName); try { - expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(inputCol, + //Constructor + expr = (VectorExpression) getConstructor(className).newInstance(inputCol, getScalarValue(constDesc)); } catch (Exception ex) { throw new HiveException(ex); @@ -897,8 +895,7 @@ private VectorExpression getVectorBinaryComparisonFilterExpression(String String className = getFilterColumnColumnExpressionClassName(colType1, colType2, opName); try { - expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2); + expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2); } catch (Exception ex) { throw new HiveException(ex); } @@ -913,8 +910,7 @@ private VectorExpression getVectorBinaryComparisonFilterExpression(String String className = getFilterColumnColumnExpressionClassName(colType1, colType2, opName); try { - expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2); + expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2); } catch (Exception ex) { throw new HiveException(ex); } @@ -930,8 +926,7 @@ private VectorExpression getVectorBinaryComparisonFilterExpression(String String className = getFilterColumnColumnExpressionClassName(colType1, colType2, opName); try { - expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2); + expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2); } catch (Exception ex) { throw new HiveException(ex); } @@ -946,8 +941,7 @@ private VectorExpression getVectorBinaryComparisonFilterExpression(String String className = getFilterColumnScalarExpressionClassName(colType1, scalarType, opName); try { - expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(inputCol1, + expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, getScalarValue(constDesc)); } catch (Exception ex) { throw new HiveException(ex); @@ -963,8 +957,7 @@ private VectorExpression getVectorBinaryComparisonFilterExpression(String String className = getFilterScalarColumnExpressionClassName(colType, scalarType, opName); try { - expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(inputCol2, + expr = (VectorExpression) getConstructor(className).newInstance(inputCol2, getScalarValue(constDesc)); } catch (Exception ex) { throw new HiveException(ex); @@ -982,8 +975,7 @@ private VectorExpression getVectorBinaryComparisonFilterExpression(String String className = getFilterColumnColumnExpressionClassName(colType1, colType2, opName); try { - expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(inputCol1, inputCol2); + expr = (VectorExpression) getConstructor(className).newInstance(inputCol1, inputCol2); } catch (Exception ex) { throw new HiveException(ex); } @@ -998,6 +990,22 @@ private VectorExpression getVectorBinaryComparisonFilterExpression(String return expr; } + private Constructor getConstructor(String className) throws HiveException { + try { + Class cl = Class.forName(className); + Constructor [] ctors = cl.getDeclaredConstructors(); + Constructor defaultCtor = cl.getConstructor(); + for (Constructor ctor : ctors) { + if (!ctor.equals(defaultCtor)) { + return ctor; + } + } + throw new HiveException("Only default constructor found"); + } catch (Exception ex) { + throw new HiveException(ex); + } + } + private String getNormalizedTypeName(String colType) throws HiveException { validateInputType(colType); String normalizedType = null; @@ -1244,31 +1252,6 @@ public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc) {"String", BytesColumnVector.class}, }; - private VectorizedRowBatch allocateRowBatch(int rowCount) throws HiveException { - int columnCount = firstOutputColumnIndex + ocm.getNumOfOutputColumn(); - VectorizedRowBatch ret = new VectorizedRowBatch(columnCount, rowCount); - for (int i=0; i < columnCount; ++i) { - String columnTypeName = ocm.getOutputColumnType(i); - for (Object[] columnType: columnTypes) { - if (columnTypeName.equalsIgnoreCase((String)columnType[0])) { - Class columnTypeClass = (Class)columnType[1]; - try { - Constructor ctor = columnTypeClass.getConstructor(int.class); - ret.cols[i] = ctor.newInstance(rowCount); - } - catch(Exception e) { - throw new HiveException ( - String.format( - "Internal exception occured trying to allocate a vectorized column %d of type %s", - i, columnTypeName), - e); - } - } - } - } - return ret; - } - public Map getOutputColumnTypeMap() { Map map = new HashMap(); for (int i = 0; i < ocm.outputColCount; i++) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index a600d74..f5a59f0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -68,6 +68,8 @@ // list does not contain partition columns private List colsToInclude; + private Map columnTypeMap = null; + /** * Constructor for VectorizedRowBatchCtx * @@ -124,6 +126,11 @@ public void init(Configuration hiveConf, FileSplit split) throws ClassNotFoundEx split.getPath(), IOPrepareCache.get().getPartitionDescMap()); Class serdeclass = part.getDeserializerClass(); + String partitionPath = split.getPath().getParent().toString(); + columnTypeMap = Utilities + .getMapRedWork(hiveConf).getMapWork().getScratchColumnVectorTypes() + .get(partitionPath); + if (serdeclass == null) { String className = part.getSerdeClassName(); if ((className == null) || (className.isEmpty())) { @@ -253,6 +260,7 @@ public VectorizedRowBatch createVectorizedRowBatch() throws HiveException } } result.numCols = fieldRefs.size(); + this.addScratchColumnsToBatch(result); return result; } @@ -330,4 +338,27 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch) throws HiveExcepti } } } + + private void addScratchColumnsToBatch(VectorizedRowBatch vrb) { + if (columnTypeMap != null && !columnTypeMap.isEmpty()) { + int origNumCols = vrb.numCols; + int newNumCols = vrb.cols.length+columnTypeMap.keySet().size(); + vrb.cols = Arrays.copyOf(vrb.cols, newNumCols); + for (int i = origNumCols; i < newNumCols; i++) { + vrb.cols[i] = allocateColumnVector(columnTypeMap.get(i), + VectorizedRowBatch.DEFAULT_SIZE); + } + vrb.numCols = vrb.cols.length; + } + } + + private ColumnVector allocateColumnVector(String type, int defaultSize) { + if (type.equalsIgnoreCase("double")) { + return new DoubleColumnVector(defaultSize); + } else if (type.equalsIgnoreCase("string")) { + return new BytesColumnVector(defaultSize); + } else { + return new LongColumnVector(defaultSize); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterExprOrExpr.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterExprOrExpr.java index 5aa0a95..703096c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterExprOrExpr.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterExprOrExpr.java @@ -24,16 +24,22 @@ * This class represents an Or expression. This applies short circuit optimization. */ public class FilterExprOrExpr extends VectorExpression { - private final int[] initialSelected = new int[VectorizedRowBatch.DEFAULT_SIZE]; - private int[] unselected = new int[VectorizedRowBatch.DEFAULT_SIZE]; - private final int[] tmp = new int[VectorizedRowBatch.DEFAULT_SIZE]; + private static final long serialVersionUID = 1L; + private transient final int[] initialSelected = new int[VectorizedRowBatch.DEFAULT_SIZE]; + private transient int[] unselected = new int[VectorizedRowBatch.DEFAULT_SIZE]; + private transient final int[] tmp = new int[VectorizedRowBatch.DEFAULT_SIZE]; public FilterExprOrExpr(VectorExpression childExpr1, VectorExpression childExpr2) { + this(); this.childExpressions = new VectorExpression[2]; childExpressions[0] = childExpr1; childExpressions[1] = childExpr2; } + public FilterExprOrExpr() { + super(); + } + @Override public void evaluate(VectorizedRowBatch batch) { int n = batch.size; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java index 3038a98..9ae371e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java @@ -77,6 +77,13 @@ private void initialize(HiveConf hiveConf) { if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT)) { resolvers.add(new BucketingSortingInferenceOptimizer()); } + + // Vectorization should be the last optimization, because it doesn't modify the plan + // or any operators. It makes a very low level transformation to the expressions to + // run in the vectorized mode. + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) { + resolvers.add(new Vectorizer()); + } } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java new file mode 100644 index 0000000..471c799 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -0,0 +1,422 @@ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.RowResolver; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.AggregationDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.udf.UDFDayOfMonth; +import org.apache.hadoop.hive.ql.udf.UDFHour; +import org.apache.hadoop.hive.ql.udf.UDFLength; +import org.apache.hadoop.hive.ql.udf.UDFLike; +import org.apache.hadoop.hive.ql.udf.UDFLower; +import org.apache.hadoop.hive.ql.udf.UDFMinute; +import org.apache.hadoop.hive.ql.udf.UDFOPDivide; +import org.apache.hadoop.hive.ql.udf.UDFOPMinus; +import org.apache.hadoop.hive.ql.udf.UDFOPMod; +import org.apache.hadoop.hive.ql.udf.UDFOPMultiply; +import org.apache.hadoop.hive.ql.udf.UDFOPNegative; +import org.apache.hadoop.hive.ql.udf.UDFOPPlus; +import org.apache.hadoop.hive.ql.udf.UDFOPPositive; +import org.apache.hadoop.hive.ql.udf.UDFSecond; +import org.apache.hadoop.hive.ql.udf.UDFUpper; +import org.apache.hadoop.hive.ql.udf.UDFWeekOfYear; +import org.apache.hadoop.hive.ql.udf.UDFYear; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNot; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp; + +public class Vectorizer implements PhysicalPlanResolver { + + protected static transient final Log LOG = LogFactory.getLog(Vectorizer.class); + + Set supportedDataTypes = new HashSet(); + List> vectorizableTasks = + new ArrayList>(); + Set> supportedGenericUDFs = new HashSet>(); + + Set supportedAggregationUdfs = new HashSet(); + + private final VectorizationContext vectorizationContext = null; + + + public Vectorizer() { + supportedDataTypes.add("int"); + supportedDataTypes.add("smallint"); + supportedDataTypes.add("tinyint"); + supportedDataTypes.add("bigint"); + supportedDataTypes.add("integer"); + supportedDataTypes.add("long"); + supportedDataTypes.add("short"); + supportedDataTypes.add("timestamp"); + supportedDataTypes.add("boolean"); + supportedDataTypes.add("string"); + supportedDataTypes.add("byte"); + supportedDataTypes.add("float"); + supportedDataTypes.add("double"); + + supportedGenericUDFs.add(UDFOPNegative.class); + supportedGenericUDFs.add(UDFOPPositive.class); + supportedGenericUDFs.add(UDFOPPlus.class); + supportedGenericUDFs.add(UDFOPMinus.class); + supportedGenericUDFs.add(UDFOPMultiply.class); + supportedGenericUDFs.add(UDFOPDivide.class); + supportedGenericUDFs.add(UDFOPMod.class); + + supportedGenericUDFs.add(GenericUDFOPEqualOrLessThan.class); + supportedGenericUDFs.add(GenericUDFOPEqualOrGreaterThan.class); + supportedGenericUDFs.add(GenericUDFOPGreaterThan.class); + supportedGenericUDFs.add(GenericUDFOPLessThan.class); + supportedGenericUDFs.add(GenericUDFOPNot.class); + supportedGenericUDFs.add(GenericUDFOPNotEqual.class); + supportedGenericUDFs.add(GenericUDFOPNotNull.class); + supportedGenericUDFs.add(GenericUDFOPNull.class); + supportedGenericUDFs.add(GenericUDFOPOr.class); + supportedGenericUDFs.add(GenericUDFOPAnd.class); + supportedGenericUDFs.add(GenericUDFOPEqual.class); + supportedGenericUDFs.add(GenericUDFToUnixTimeStamp.class); + + supportedGenericUDFs.add(UDFHour.class); + supportedGenericUDFs.add(UDFLength.class); + supportedGenericUDFs.add(UDFMinute.class); + supportedGenericUDFs.add(UDFSecond.class); + supportedGenericUDFs.add(UDFYear.class); + supportedGenericUDFs.add(UDFWeekOfYear.class); + supportedGenericUDFs.add(UDFDayOfMonth.class); + + supportedGenericUDFs.add(UDFLike.class); + supportedGenericUDFs.add(UDFLower.class); + supportedGenericUDFs.add(UDFUpper.class); + + supportedAggregationUdfs.add("min"); + supportedAggregationUdfs.add("max"); + supportedAggregationUdfs.add("count"); + supportedAggregationUdfs.add("sum"); + supportedAggregationUdfs.add("avg"); + supportedAggregationUdfs.add("variance"); + supportedAggregationUdfs.add("var_pop"); + supportedAggregationUdfs.add("var_samp"); + supportedAggregationUdfs.add("std"); + supportedAggregationUdfs.add("stddev"); + supportedAggregationUdfs.add("stddev_pop"); + supportedAggregationUdfs.add("stddev_samp"); + } + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + boolean vectorPath = HiveConf.getBoolVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); + if (!vectorPath) { + LOG.info("Vectorization is disabled"); + return pctx; + } + // Validate if vectorization is possible + // Traverse all the tasks + List> rootTasks = pctx.getRootTasks(); + for (Task t : rootTasks) { + validateTask(t, pctx); + } + + if (vectorizableTasks.isEmpty()) { + System.err.println("Query cannot be vectorized"); + HiveConf.setBoolVar(pctx.getConf(), HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); + return pctx; + } else { + for (Task t : vectorizableTasks) { + System.err.println("Running vectorized execution for task: "+t.getJobID()); + vectorizeTask(t, pctx); + } + } + return pctx; + } + + private void validateTask(Task t, PhysicalContext pctx) { + + if (!(t.getWork() instanceof MapRedTask)) { + boolean ret = validateMrTask((MapRedTask) t, pctx); + if (ret) { + vectorizableTasks.add(t); + } + } + if (t.getChildTasks() != null) { + for (Task ct : t.getChildTasks()) { + validateTask(ct, pctx); + } + } + } + + private boolean validateMrTask(MapRedTask mrTask, PhysicalContext pctx) { + MapWork mapWork = mrTask.getWork().getMapWork(); + + // Validate the input format + for (String path : mapWork.getPathToPartitionInfo().keySet()) { + PartitionDesc pd = mapWork.getPathToPartitionInfo().get(path); + List> interfaceList = + Arrays.asList(pd.getInputFileFormatClass().getInterfaces()); + if (!interfaceList.contains(VectorizedInputFormatInterface.class)) { + LOG.debug("Input format: " + pd.getInputFileFormatClassName() + + ", doesn't provide vectorized input"); + return false; + } + } + + for (String alias : mapWork.getAliasToWork().keySet()) { + Operator op = mapWork.getAliasToWork().get(alias); + boolean ret = validateOperator(op, pctx); + if (!ret) { + return false; + } + } + return true; + } + + private boolean validateOperator(Operator op, PhysicalContext pctx) { + boolean ret = false; + switch (op.getType()) { + case GROUPBY: + ret = validateGroupByOperator((GroupByOperator) op, pctx); + break; + case FILTER: + ret = validateFilterOperator((FilterOperator) op, pctx); + break; + case SELECT: + ret = validateSelectOperator((SelectOperator) op, pctx); + break; + case REDUCESINK: + ret = validateReduceSinkOperator((ReduceSinkOperator) op, pctx); + break; + case FILESINK: + case TABLESCAN: + ret = true; + break; + default: + ret = false; + break; + } + return ret; + } + + private boolean validateReduceSinkOperator(ReduceSinkOperator op, PhysicalContext pctx) { + List keyDescs = op.getConf().getKeyCols(); + List partitionDescs = op.getConf().getPartitionCols(); + List valueDesc = op.getConf().getValueCols(); + return validateExprNodeDesc(keyDescs) && validateExprNodeDesc(partitionDescs) && + validateExprNodeDesc(valueDesc); + } + + private boolean validateSelectOperator(SelectOperator op, PhysicalContext pctx) { + List descList = op.getConf().getColList(); + for (ExprNodeDesc desc : descList) { + boolean ret = validateExprNodeDesc(desc); + if (!ret) { + return false; + } + } + return true; + } + + private boolean validateFilterOperator(FilterOperator op, PhysicalContext pctx) { + ExprNodeDesc desc = op.getConf().getPredicate(); + return validateExprNodeDesc(desc); + } + + private boolean validateGroupByOperator(GroupByOperator op, PhysicalContext pctx) { + boolean ret = validateExprNodeDesc(op.getConf().getKeys()); + if (!ret) { + return false; + } + return validateAggregationDesc(op.getConf().getAggregators()); + } + + private boolean validateExprNodeDesc(List descs) { + for (ExprNodeDesc d : descs) { + boolean ret = validateExprNodeDesc(d); + if (!ret) { + return false; + } + } + return true; + } + + private boolean validateAggregationDesc(List descs) { + for (AggregationDesc d : descs) { + boolean ret = validateAggregationDesc(d); + if (!ret) { + return false; + } + } + return true; + } + + + private boolean validateExprNodeDesc(ExprNodeDesc desc) { + boolean ret = validateDataType(desc.getTypeString()); + if (!ret) { + return false; + } + if (desc instanceof ExprNodeGenericFuncDesc) { + ExprNodeGenericFuncDesc d = (ExprNodeGenericFuncDesc) desc; + boolean r = validateGenericUdf(d.getGenericUDF()); + if (!r) { + return false; + } + } + if (desc.getChildren() != null) { + for (ExprNodeDesc d: desc.getChildren()) { + validateExprNodeDesc(d); + } + } + return true; + } + + private boolean validateGenericUdf(GenericUDF genericUDF) { + if (genericUDF instanceof GenericUDFBridge) { + Class udf = ((GenericUDFBridge) genericUDF).getUdfClass(); + return supportedGenericUDFs.contains(udf); + } else { + return supportedGenericUDFs.contains(genericUDF.getClass()); + } + } + + private boolean validateAggregationDesc(AggregationDesc aggDesc) { + return supportedAggregationUdfs.contains(aggDesc.getGenericUDAFName().toLowerCase()); + } + + private boolean validateDataType(String type) { + return supportedDataTypes.contains(type.toLowerCase()); + } + + private void vectorizeTask(Task t, PhysicalContext pctx) { + if (t instanceof MapRedTask) { + try { + vectorizeMapRedTask((MapRedTask) t, pctx); + } catch (HiveException ex) { + throw new RuntimeException(ex); + } + } + } + + private void vectorizeMapRedTask(MapRedTask mrTask, PhysicalContext pctx) throws HiveException { + MapWork mWork = mrTask.getWork().getMapWork(); + Map> scratchColumnVectorTypes = + new HashMap>(); + for (String onefile : mWork.getPathToAliases().keySet()) { + scratchColumnVectorTypes.put(onefile, new HashMap()); + List aliases = mWork.getPathToAliases().get(onefile); + // It is assumed that the row schema is same for all the aliases because same + VectorizationContext vContext = null; + for (String alias : aliases) { + Operator op = mWork.getAliasToWork().get(alias); + if (vContext == null) { + vContext = getVectorizationContext(op, pctx); + } + vectorizeOperator(op, vContext); + } + if (vContext != null) { + scratchColumnVectorTypes.put(onefile, vContext.getOutputColumnTypeMap()); + } + } + mWork.setScratchColumnVectorTypes(scratchColumnVectorTypes); + } + + private VectorizationContext getVectorizationContext(Operator op, + PhysicalContext pctx) { + RowResolver rr = pctx.getParseContext().getOpParseCtx().get(op).getRowResolver(); + + Map cmap = new HashMap(); + int columnCount = 0; + for (ColumnInfo c : rr.getColumnInfos()) { + if (!c.getIsVirtualCol()) { + cmap.put(c.getInternalName(), columnCount++); + } + } + return new VectorizationContext(cmap, columnCount); + } + + private void vectorizeOperator(Operator op, + VectorizationContext vContext) throws HiveException { + Operator vectorOp = null; + + boolean recursive = true; + + switch (op.getType()) { + case GROUPBY: + vectorOp = OperatorFactory.getVectorOperator(op.getConf(), vContext); + recursive = false; + break; + case FILTER: + case SELECT: + case FILESINK: + case REDUCESINK: + vectorOp = OperatorFactory.getVectorOperator(op.getConf(), vContext); + break; + default: + vectorOp = op; + break; + } + + if (vectorOp != op) { + if (op.getParentOperators() != null) { + vectorOp.setParentOperators(op.getParentOperators()); + for (Operator p : op.getParentOperators()) { + int index = p.getChildOperators().indexOf(op); + p.getChildOperators().set(index, vectorOp); + } + } + if (op.getChildOperators() != null) { + vectorOp.setChildOperators(op.getChildOperators()); + for (Operator c : op.getChildOperators()) { + int index = c.getParentOperators().indexOf(op); + c.getParentOperators().set(index, vectorOp); + } + } + } + + if (recursive) { + if (vectorOp.getChildOperators() != null) { + for (Operator c : vectorOp.getChildOperators()) { + vectorizeOperator(c, vContext); + } + } + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index ff90bc8..c9da680 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -52,7 +52,7 @@ * distributed on the cluster. The ExecMapper will ultimately deserialize this * class on the data nodes and setup it's operator pipeline accordingly. * - * This class is also used in the explain command any property with the + * This class is also used in the explain command any property with the * appropriate annotation will be displayed in the explain output. */ @SuppressWarnings({"serial", "deprecation"}) @@ -112,6 +112,8 @@ private transient boolean useBucketizedHiveInputFormat; + private Map> scratchColumnVectorTypes = null; + public MapWork() { } @@ -479,4 +481,13 @@ public void configureJobConf(JobConf job) { PlanUtils.configureJobConf(fs.getConf().getTableInfo(), job); } } + + public Map> getScratchColumnVectorTypes() { + return scratchColumnVectorTypes; + } + + public void setScratchColumnVectorTypes( + Map> scratchColumnVectorTypes) { + this.scratchColumnVectorTypes = scratchColumnVectorTypes; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorFilterOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorFilterOperator.java index 8881e0f..c877cef 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorFilterOperator.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorFilterOperator.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.ql.exec.vector; +import java.util.HashMap; +import java.util.Map; + import junit.framework.Assert; import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterExprAndExpr; @@ -25,6 +28,8 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.FilterLongColEqualDoubleScalar; import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.FilterLongColGreaterLongColumn; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.junit.Test; /** @@ -76,9 +81,19 @@ private LongColumnVector getLongVector(int len) { } } + private VectorFilterOperator getAVectorFilterOperator() throws HiveException { + ExprNodeColumnDesc col1Expr = new ExprNodeColumnDesc(Long.class, "col1", "table", false); + Map columnMap = new HashMap(); + columnMap.put("col1", 1); + VectorizationContext vc = new VectorizationContext(columnMap, 1); + FilterDesc fdesc = new FilterDesc(); + fdesc.setPredicate(col1Expr); + return new VectorFilterOperator(vc, fdesc); + } + @Test public void testBasicFilterOperator() throws HiveException { - VectorFilterOperator vfo = new VectorFilterOperator(null, null); + VectorFilterOperator vfo = getAVectorFilterOperator(); VectorExpression ve1 = new FilterLongColGreaterLongColumn(0,1); VectorExpression ve2 = new FilterLongColEqualDoubleScalar(2, 0); VectorExpression ve3 = new FilterExprAndExpr(ve1,ve2); @@ -105,7 +120,7 @@ public void testBasicFilterOperator() throws HiveException { @Test public void testBasicFilterLargeData() throws HiveException { - VectorFilterOperator vfo = new VectorFilterOperator(null, null); + VectorFilterOperator vfo = getAVectorFilterOperator(); VectorExpression ve1 = new FilterLongColGreaterLongColumn(0,1); VectorExpression ve2 = new FilterLongColEqualDoubleScalar(2, 0); VectorExpression ve3 = new FilterExprAndExpr(ve1,ve2);