diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index c61d95b..07d302a 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -765,6 +765,9 @@ "org.apache.hadoop.hive.ql.exec.PTFPersistence$PartitionedByteBasedList"), HIVE_PTF_PARTITION_PERSISTENT_SIZE("hive.ptf.partition.persistence.memsize", (int) Math.pow(2, (5 + 10 + 10)) ), // 32MB + + //Vectorization enabled + HIVE_VECTORIZATION_ENABLED("hive.vectorized.execution.enabled", false), ; public final String varname; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java index 4da68a0..ca32013 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.exec.vector.VectorExecMapper; import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; @@ -273,7 +274,17 @@ public int execute(DriverContext driverContext) { ShimLoader.getHadoopShims().prepareJobOutput(job); //See the javadoc on HiveOutputFormatImpl and HadoopShims.prepareJobOutput() job.setOutputFormat(HiveOutputFormatImpl.class); - job.setMapperClass(ExecMapper.class); + + + boolean vectorPath = HiveConf.getBoolVar(job, + HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); + + if (vectorPath) { + System.out.println("Going down the vectorization path"); + job.setMapperClass(VectorExecMapper.class); + } else { + job.setMapperClass(ExecMapper.class); + } job.setMapOutputKeyClass(HiveKey.class); job.setMapOutputValueClass(BytesWritable.class); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 564e166..b48fee5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -99,7 +99,7 @@ protected transient State state = State.UNINIT; - static transient boolean fatalError = false; // fatalError is shared acorss + protected static transient boolean fatalError = false; // fatalError is shared acorss // all operators static { @@ -1480,6 +1480,7 @@ public boolean opAllowedBeforeSortMergeJoin() { return true; } + @Override public String toString() { return getName() + "[" + getIdentifier() + "]"; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExecMapper.java new file mode 100644 index 0000000..63d3e62 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExecMapper.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.FetchOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.StringUtils; +/** + * ExecMapper. + * + */ +public class VectorExecMapper extends MapReduceBase implements Mapper { + + private VectorMapOperator mo; + private Map fetchOperators; + private OutputCollector oc; + private JobConf jc; + private boolean abort = false; + private Reporter rp; + public static final Log l4j = LogFactory.getLog("VectorExecMapper"); + private static boolean done; + + // used to log memory usage periodically + public static MemoryMXBean memoryMXBean; + private long numRows = 0; + private long nextCntr = 1; + private MapredLocalWork localWork = null; + private boolean isLogInfoEnabled = false; + + private final ExecMapperContext execContext = new ExecMapperContext(); + + @Override + public void configure(JobConf job) { + // Allocate the bean at the beginning - + memoryMXBean = ManagementFactory.getMemoryMXBean(); + l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); + + isLogInfoEnabled = l4j.isInfoEnabled(); + + try { + l4j.info("conf classpath = " + + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs())); + l4j.info("thread classpath = " + + Arrays.asList(((URLClassLoader) Thread.currentThread() + .getContextClassLoader()).getURLs())); + } catch (Exception e) { + l4j.info("cannot get classpath: " + e.getMessage()); + } + try { + jc = job; + execContext.setJc(jc); + // create map and fetch operators + MapredWork mrwork = Utilities.getMapRedWork(job); + mo = new VectorMapOperator(); + mo.setConf(mrwork); + // initialize map operator + mo.setChildren(job); + l4j.info(mo.dump(0)); + // initialize map local work + localWork = mrwork.getMapLocalWork(); + execContext.setLocalWork(localWork); + + mo.setExecContext(execContext); + mo.initializeLocalWork(jc); + mo.initialize(jc, null); + + if (localWork == null) { + return; + } + + //The following code is for mapjoin + //initialize all the dummy ops + l4j.info("Initializing dummy operator"); + List> dummyOps = localWork.getDummyParentOp(); + for (Operator dummyOp : dummyOps){ + dummyOp.setExecContext(execContext); + dummyOp.initialize(jc,null); + } + + + } catch (Throwable e) { + abort = true; + if (e instanceof OutOfMemoryError) { + // will this be true here? + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + throw new RuntimeException("Map operator initialization failed", e); + } + } + } + + public void map(Object key, Object value, OutputCollector output, + Reporter reporter) throws IOException { + if (oc == null) { + oc = output; + rp = reporter; + mo.setOutputCollector(oc); + mo.setReporter(rp); + } + // reset the execContext for each new row + execContext.resetRow(); + + try { + if (mo.getDone()) { + done = true; + } else { + // Since there is no concept of a group, we don't invoke + // startGroup/endGroup for a mapper + mo.process(value); + if (isLogInfoEnabled) { + numRows++; + if (numRows == nextCntr) { + long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); + l4j.info("ExecMapper: processing " + numRows + + " rows: used memory = " + used_memory); + nextCntr = getNextCntr(numRows); + } + } + } + } catch (Throwable e) { + abort = true; + if (e instanceof OutOfMemoryError) { + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + l4j.fatal(StringUtils.stringifyException(e)); + throw new RuntimeException(e); + } + } + } + + + private long getNextCntr(long cntr) { + // A very simple counter to keep track of number of rows processed by the + // reducer. It dumps + // every 1 million times, and quickly before that + if (cntr >= 1000000) { + return cntr + 1000000; + } + + return 10 * cntr; + } + + @Override + public void close() { + // No row was processed + if (oc == null) { + l4j.trace("Close called. no row processed by map."); + } + + // check if there are IOExceptions + if (!abort) { + abort = execContext.getIoCxt().getIOExceptions(); + } + + // detecting failed executions by exceptions thrown by the operator tree + // ideally hadoop should let us know whether map execution failed or not + try { + mo.close(abort); + + //for close the local work + if(localWork != null){ + List> dummyOps = localWork.getDummyParentOp(); + + for (Operator dummyOp : dummyOps){ + dummyOp.close(abort); + } + } + + if (fetchOperators != null) { + MapredLocalWork localWork = mo.getConf().getMapLocalWork(); + for (Map.Entry entry : fetchOperators.entrySet()) { + Operator forwardOp = localWork + .getAliasToWork().get(entry.getKey()); + forwardOp.close(abort); + } + } + + if (isLogInfoEnabled) { + long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); + l4j.info("ExecMapper: processed " + numRows + " rows: used memory = " + + used_memory); + } + + reportStats rps = new reportStats(rp); + mo.preorderMap(rps); + return; + } catch (Exception e) { + if (!abort) { + // signal new failure to map-reduce + l4j.error("Hit error while closing operators - failing tree"); + throw new RuntimeException("Hive Runtime Error while closing operators", e); + } + } + } + + public static boolean getDone() { + return done; + } + + public boolean isAbort() { + return abort; + } + + public void setAbort(boolean abort) { + this.abort = abort; + } + + public static void setDone(boolean done) { + VectorExecMapper.done = done; + } + + /** + * reportStats. + * + */ + public static class reportStats implements Operator.OperatorFunc { + Reporter rp; + + public reportStats(Reporter rp) { + this.rp = rp; + } + + public void func(Operator op) { + Map opStats = op.getStats(); + for (Map.Entry e : opStats.entrySet()) { + if (rp != null) { + rp.incrCounter(e.getKey(), e.getValue()); + } + } + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java new file mode 100644 index 0000000..989cc2d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java @@ -0,0 +1,1046 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.ExecDriver; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack; +import org.apache.hadoop.hive.ql.exec.Stat; +import org.apache.hadoop.hive.ql.exec.TerminalOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.HivePartitioner; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.ql.stats.StatsPublisher; +import org.apache.hadoop.hive.ql.stats.StatsSetupConst; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.Serializer; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * File Sink operator implementation. + **/ +public class VectorFileSinkOperator extends TerminalOperator implements + Serializable { + + protected transient HashMap valToPaths; + protected transient int numDynParts; + protected transient List dpColNames; + protected transient DynamicPartitionCtx dpCtx; + protected transient boolean isCompressed; + protected transient Path parent; + protected transient HiveOutputFormat hiveOutputFormat; + protected transient Path specPath; + protected transient String childSpecPathDynLinkedPartitions; + protected transient int dpStartCol; // start column # for DP columns + protected transient List dpVals; // array of values corresponding to DP columns + protected transient List dpWritables; + protected transient RecordWriter[] rowOutWriters; // row specific RecordWriters + protected transient int maxPartitions; + protected transient ListBucketingCtx lbCtx; + protected transient boolean isSkewedStoredAsSubDirectories; + private transient boolean statsCollectRawDataSize; + + private static final transient String[] FATAL_ERR_MSG = { + null, // counter value 0 means no error + "Number of dynamic partitions exceeded hive.exec.max.dynamic.partitions" + + ".pernode." + }; + private final VectorizationContext vContext; + + public VectorFileSinkOperator(VectorizationContext context, + OperatorDesc conf) { + this.vContext = context; + this.conf = (FileSinkDesc) conf; + } + + public class FSPaths implements Cloneable { + Path tmpPath; + Path taskOutputTempPath; + Path[] outPaths; + Path[] finalPaths; + RecordWriter[] outWriters; + Stat stat; + + public FSPaths() { + } + + public FSPaths(Path specPath) { + tmpPath = Utilities.toTempPath(specPath); + taskOutputTempPath = Utilities.toTaskTempPath(specPath); + outPaths = new Path[numFiles]; + finalPaths = new Path[numFiles]; + outWriters = new RecordWriter[numFiles]; + stat = new Stat(); + } + + /** + * Update OutPath according to tmpPath. + */ + public Path getTaskOutPath(String taskId) { + return getOutPath(taskId, this.taskOutputTempPath); + } + + + /** + * Update OutPath according to tmpPath. + */ + public Path getOutPath(String taskId) { + return getOutPath(taskId, this.tmpPath); + } + + /** + * Update OutPath according to tmpPath. + */ + public Path getOutPath(String taskId, Path tmp) { + return new Path(tmp, Utilities.toTempPath(taskId)); + } + + /** + * Update the final paths according to tmpPath. + */ + public Path getFinalPath(String taskId) { + return getFinalPath(taskId, this.tmpPath, null); + } + + /** + * Update the final paths according to tmpPath. + */ + public Path getFinalPath(String taskId, Path tmpPath, String extension) { + if (extension != null) { + return new Path(tmpPath, taskId + extension); + } else { + return new Path(tmpPath, taskId); + } + } + + public void setOutWriters(RecordWriter[] out) { + outWriters = out; + } + + public RecordWriter[] getOutWriters() { + return outWriters; + } + + public void closeWriters(boolean abort) throws HiveException { + for (int idx = 0; idx < outWriters.length; idx++) { + if (outWriters[idx] != null) { + try { + outWriters[idx].close(abort); + updateProgress(); + } catch (IOException e) { + throw new HiveException(e); + } + } + } + } + + private void commit(FileSystem fs) throws HiveException { + for (int idx = 0; idx < outPaths.length; ++idx) { + try { + if ((bDynParts || isSkewedStoredAsSubDirectories) + && !fs.exists(finalPaths[idx].getParent())) { + fs.mkdirs(finalPaths[idx].getParent()); + } + if (!fs.rename(outPaths[idx], finalPaths[idx])) { + throw new HiveException("Unable to rename output from: " + + outPaths[idx] + " to: " + finalPaths[idx]); + } + updateProgress(); + } catch (IOException e) { + throw new HiveException("Unable to rename output from: " + + outPaths[idx] + " to: " + finalPaths[idx], e); + } + } + } + + public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws HiveException { + for (int idx = 0; idx < outWriters.length; idx++) { + if (outWriters[idx] != null) { + try { + outWriters[idx].close(abort); + if (delete) { + fs.delete(outPaths[idx], true); + } + updateProgress(); + } catch (IOException e) { + throw new HiveException(e); + } + } + } + } + } // class FSPaths + + private static final long serialVersionUID = 1L; + protected transient FileSystem fs; + protected transient Serializer serializer; + protected transient BytesWritable commonKey = new BytesWritable(); + protected transient TableIdEnum tabIdEnum = null; + private transient LongWritable row_count; + private transient boolean isNativeTable = true; + + /** + * The evaluators for the multiFile sprayer. If the table under consideration has 1000 buckets, + * it is not a good idea to start so many reducers - if the maximum number of reducers is 100, + * each reducer can write 10 files - this way we effectively get 1000 files. + */ + private transient ExprNodeEvaluator[] partitionEval; + private transient int totalFiles; + private transient int numFiles; + private transient boolean multiFileSpray; + private transient final Map bucketMap = new HashMap(); + + private transient ObjectInspector[] partitionObjectInspectors; + private transient HivePartitioner prtner; + private transient final HiveKey key = new HiveKey(); + private transient Configuration hconf; + private transient FSPaths fsp; + private transient boolean bDynParts; + private transient SubStructObjectInspector subSetOI; + private transient int timeOut; // JT timeout in msec. + private transient long lastProgressReport = System.currentTimeMillis(); + + /** + * TableIdEnum. + * + */ + public static enum TableIdEnum { + TABLE_ID_1_ROWCOUNT, + TABLE_ID_2_ROWCOUNT, + TABLE_ID_3_ROWCOUNT, + TABLE_ID_4_ROWCOUNT, + TABLE_ID_5_ROWCOUNT, + TABLE_ID_6_ROWCOUNT, + TABLE_ID_7_ROWCOUNT, + TABLE_ID_8_ROWCOUNT, + TABLE_ID_9_ROWCOUNT, + TABLE_ID_10_ROWCOUNT, + TABLE_ID_11_ROWCOUNT, + TABLE_ID_12_ROWCOUNT, + TABLE_ID_13_ROWCOUNT, + TABLE_ID_14_ROWCOUNT, + TABLE_ID_15_ROWCOUNT; + } + + protected transient boolean autoDelete = false; + protected transient JobConf jc; + Class outputClass; + String taskId; + + private boolean filesCreated = false; + + private void initializeSpecPath() { + // For a query of the type: + // insert overwrite table T1 + // select * from (subq1 union all subq2)u; + // subQ1 and subQ2 write to directories Parent/Child_1 and + // Parent/Child_2 respectively, and union is removed. + // The movetask that follows subQ1 and subQ2 tasks moves the directory + // 'Parent' + + // However, if the above query contains dynamic partitions, subQ1 and + // subQ2 have to write to directories: Parent/DynamicPartition/Child_1 + // and Parent/DynamicPartition/Child_1 respectively. + // The movetask that follows subQ1 and subQ2 tasks still moves the directory + // 'Parent' + if ((!conf.isLinkedFileSink()) || (dpCtx == null)) { + specPath = new Path(conf.getDirName()); + childSpecPathDynLinkedPartitions = null; + return; + } + + specPath = new Path(conf.getParentDir()); + childSpecPathDynLinkedPartitions = Utilities.getFileNameFromDirName(conf.getDirName()); + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + try { + this.hconf = hconf; + filesCreated = false; + isNativeTable = !conf.getTableInfo().isNonNative(); + multiFileSpray = conf.isMultiFileSpray(); + totalFiles = conf.getTotalFiles(); + numFiles = conf.getNumFiles(); + dpCtx = conf.getDynPartCtx(); + lbCtx = conf.getLbCtx(); + valToPaths = new HashMap(); + taskId = Utilities.getTaskId(hconf); + initializeSpecPath(); + fs = specPath.getFileSystem(hconf); + hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance(); + isCompressed = conf.getCompressed(); + parent = Utilities.toTempPath(conf.getDirName()); + statsCollectRawDataSize = conf.isStatsCollectRawDataSize(); + + serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance(); + System.out.println("Deserializer class = "+serializer.getClass().toString()); + serializer.initialize(null, conf.getTableInfo().getProperties()); + outputClass = serializer.getSerializedClass(); + + // Timeout is chosen to make sure that even if one iteration takes more than + // half of the script.timeout but less than script.timeout, we will still + // be able to report progress. + timeOut = hconf.getInt("mapred.healthChecker.script.timeout", 600000) / 2; + + if (hconf instanceof JobConf) { + jc = (JobConf) hconf; + } else { + // test code path + jc = new JobConf(hconf, ExecDriver.class); + } + + if (multiFileSpray) { + partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()]; + int i = 0; + for (ExprNodeDesc e : conf.getPartitionCols()) { + partitionEval[i++] = ExprNodeEvaluatorFactory.get(e); + } + + partitionObjectInspectors = initEvaluators(partitionEval, outputObjInspector); + prtner = (HivePartitioner) ReflectionUtils.newInstance( + jc.getPartitionerClass(), null); + } + int id = conf.getDestTableId(); + if ((id != 0) && (id <= TableIdEnum.values().length)) { + String enumName = "TABLE_ID_" + String.valueOf(id) + "_ROWCOUNT"; + tabIdEnum = TableIdEnum.valueOf(enumName); + row_count = new LongWritable(); + statsMap.put(tabIdEnum, row_count); + } + + if (dpCtx != null) { + dpSetup(); + } + + if (lbCtx != null) { + lbSetup(); + } + + if (!bDynParts) { + fsp = new FSPaths(specPath); + + // Create all the files - this is required because empty files need to be created for + // empty buckets + // createBucketFiles(fsp); + if (!this.isSkewedStoredAsSubDirectories) { + valToPaths.put("", fsp); // special entry for non-DP case + } + } + + initializeChildren(hconf); + } catch (HiveException e) { + throw e; + } catch (Exception e) { + e.printStackTrace(); + throw new HiveException(e); + } + } + + /** + * Initialize list bucketing information + */ + private void lbSetup() { + this.isSkewedStoredAsSubDirectories = ((lbCtx == null) ? false : lbCtx.isSkewedStoredAsDir()); + } + + /** + * Set up for dynamic partitioning including a new ObjectInspector for the output row. + */ + private void dpSetup() { + + this.bDynParts = false; + this.numDynParts = dpCtx.getNumDPCols(); + this.dpColNames = dpCtx.getDPColNames(); + this.maxPartitions = dpCtx.getMaxPartitionsPerNode(); + + assert numDynParts == dpColNames.size() : "number of dynamic paritions should be the same as the size of DP mapping"; + + if (dpColNames != null && dpColNames.size() > 0) { + this.bDynParts = true; + assert inputObjInspectors.length == 1 : "FileSinkOperator should have 1 parent, but it has " + + inputObjInspectors.length; + StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[0]; + // remove the last dpMapping.size() columns from the OI + List fieldOI = soi.getAllStructFieldRefs(); + ArrayList newFieldsOI = new ArrayList(); + ArrayList newFieldsName = new ArrayList(); + this.dpStartCol = 0; + for (StructField sf : fieldOI) { + String fn = sf.getFieldName(); + if (!dpCtx.getInputToDPCols().containsKey(fn)) { + newFieldsOI.add(sf.getFieldObjectInspector()); + newFieldsName.add(sf.getFieldName()); + this.dpStartCol++; + } + } + assert newFieldsOI.size() > 0 : "new Fields ObjectInspector is empty"; + + this.subSetOI = new SubStructObjectInspector(soi, 0, this.dpStartCol); + this.dpVals = new ArrayList(numDynParts); + this.dpWritables = new ArrayList(numDynParts); + } + } + + private void createBucketFiles(FSPaths fsp) throws HiveException { + try { + int filesIdx = 0; + Set seenBuckets = new HashSet(); + for (int idx = 0; idx < totalFiles; idx++) { + if (this.getExecContext() != null && this.getExecContext().getFileId() != null) { + LOG.info("replace taskId from execContext "); + + taskId = Utilities.replaceTaskIdFromFilename(taskId, this.getExecContext().getFileId()); + + LOG.info("new taskId: FS " + taskId); + + assert !multiFileSpray; + assert totalFiles == 1; + } + + if (multiFileSpray) { + key.setHashCode(idx); + + // Does this hashcode belong to this reducer + int numReducers = totalFiles / numFiles; + + if (numReducers > 1) { + int currReducer = Integer.valueOf(Utilities.getTaskIdFromFilename(Utilities + .getTaskId(hconf))); + + int reducerIdx = prtner.getPartition(key, null, numReducers); + if (currReducer != reducerIdx) { + continue; + } + } + + int bucketNum = prtner.getBucket(key, null, totalFiles); + if (seenBuckets.contains(bucketNum)) { + continue; + } + seenBuckets.add(bucketNum); + + bucketMap.put(bucketNum, filesIdx); + taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum); + } + if (isNativeTable) { + fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId); + LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]); + fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId); + LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]); + } else { + fsp.finalPaths[filesIdx] = fsp.outPaths[filesIdx] = specPath; + } + try { + // The reason to keep these instead of using + // OutputFormat.getRecordWriter() is that + // getRecordWriter does not give us enough control over the file name that + // we create. + String extension = Utilities.getFileExtension(jc, isCompressed, + hiveOutputFormat); + if (!bDynParts && !this.isSkewedStoredAsSubDirectories) { + fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, parent, extension); + } else { + fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, extension); + } + + } catch (Exception e) { + e.printStackTrace(); + throw new HiveException(e); + } + LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]); + + if (isNativeTable) { + try { + // in recent hadoop versions, use deleteOnExit to clean tmp files. + autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit( + fs, fsp.outPaths[filesIdx]); + } catch (IOException e) { + throw new HiveException(e); + } + } + + Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc); + // only create bucket files only if no dynamic partitions, + // buckets of dynamic partitions will be created for each newly created partition + fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter( + jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx], + reporter); + // increment the CREATED_FILES counter + if (reporter != null) { + reporter.incrCounter(ProgressCounter.CREATED_FILES, 1); + } + filesIdx++; + } + assert filesIdx == numFiles; + + // in recent hadoop versions, use deleteOnExit to clean tmp files. + if (isNativeTable) { + autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs, fsp.outPaths[0]); + } + } catch (HiveException e) { + throw e; + } catch (Exception e) { + e.printStackTrace(); + throw new HiveException(e); + } + + filesCreated = true; + } + + /** + * Report status to JT so that JT won't kill this task if closing takes too long + * due to too many files to close and the NN is overloaded. + * + * @return true if a new progress update is reported, false otherwise. + */ + private boolean updateProgress() { + if (reporter != null && + (System.currentTimeMillis() - lastProgressReport) > timeOut) { + reporter.progress(); + lastProgressReport = System.currentTimeMillis(); + return true; + } else { + return false; + } + } + + Writable recordValue; + + @Override + public void processOp(Object data, int tag) throws HiveException { + + VectorizedRowBatch vrg = (VectorizedRowBatch)data; + + Writable [] records = null; + boolean vectorizedSerde = false; + int outputIterations = 1; + try { + if (serializer instanceof VectorizedSerde) { + recordValue = ((VectorizedSerde) serializer).serializeVector(vrg, + inputObjInspectors[0]); + records = (Writable[]) ((ObjectWritable) recordValue).get(); + vectorizedSerde = true; + outputIterations = vrg.size; + } + } catch (SerDeException e1) { + throw new HiveException(e1); + } + + for (int i = 0; i < outputIterations; i++) { + Writable row = null; + if (vectorizedSerde) { + row = records[i]; + } else { + row = new Text(vrg.toString()); + } + /* Create list bucketing sub-directory only if stored-as-directories is on. */ + String lbDirName = null; + //lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row); + + FSPaths fpaths; + + if (!bDynParts && !filesCreated) { + if (lbDirName != null) { + FSPaths fsp2 = lookupListBucketingPaths(lbDirName); + } else { + createBucketFiles(fsp); + } + } + + // Since File Sink is a terminal operator, forward is not called - so, + // maintain the number of output rows explicitly + if (counterNameToEnum != null) { + ++outputRows; + if (outputRows % 1000 == 0) { + incrCounter(numOutputRowsCntr, outputRows); + outputRows = 0; + } + } + + try { + updateProgress(); + + // if DP is enabled, get the final output writers and prepare the real output row + assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT : "input object inspector is not struct"; + + if (bDynParts) { + // copy the DP column values from the input row to dpVals + dpVals.clear(); + dpWritables.clear(); + ObjectInspectorUtils.partialCopyToStandardObject(dpWritables, row, dpStartCol, numDynParts, + (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE); + // get a set of RecordWriter based on the DP column values + // pass the null value along to the escaping process to determine what the dir should be + for (Object o : dpWritables) { + if (o == null || o.toString().length() == 0) { + dpVals.add(dpCtx.getDefaultPartitionName()); + } else { + dpVals.add(o.toString()); + } + } + fpaths = getDynOutPaths(dpVals, lbDirName); + + } else { + if (lbDirName != null) { + fpaths = lookupListBucketingPaths(lbDirName); + } else { + fpaths = fsp; + } + } + + rowOutWriters = fpaths.outWriters; + if (conf.isGatherStats()) { + if (statsCollectRawDataSize) { + SerDeStats stats = serializer.getSerDeStats(); + if (stats != null) { + fpaths.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize()); + } + } + fpaths.stat.addToStat(StatsSetupConst.ROW_COUNT, 1); + } + + + if (row_count != null) { + row_count.set(row_count.get() + 1); + } + + if (!multiFileSpray) { + rowOutWriters[0].write(row); + } else { + int keyHashCode = 0; + key.setHashCode(keyHashCode); + int bucketNum = prtner.getBucket(key, null, totalFiles); + int idx = bucketMap.get(bucketNum); + rowOutWriters[idx].write(row); + } + } catch (IOException e) { + throw new HiveException(e); + } + } + } + + /** + * Lookup list bucketing path. + * @param lbDirName + * @return + * @throws HiveException + */ + private FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException { + FSPaths fsp2 = valToPaths.get(lbDirName); + if (fsp2 == null) { + fsp2 = createNewPaths(lbDirName); + } + return fsp2; + } + + /** + * create new path. + * + * @param dirName + * @return + * @throws HiveException + */ + private FSPaths createNewPaths(String dirName) throws HiveException { + FSPaths fsp2 = new FSPaths(specPath); + if (childSpecPathDynLinkedPartitions != null) { + fsp2.tmpPath = new Path(fsp2.tmpPath, + dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions); + fsp2.taskOutputTempPath = + new Path(fsp2.taskOutputTempPath, + dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions); + } else { + fsp2.tmpPath = new Path(fsp2.tmpPath, dirName); + fsp2.taskOutputTempPath = + new Path(fsp2.taskOutputTempPath, dirName); + } + createBucketFiles(fsp2); + valToPaths.put(dirName, fsp2); + return fsp2; + } + + private FSPaths getDynOutPaths(List row, String lbDirName) throws HiveException { + + FSPaths fp; + + // get the path corresponding to the dynamic partition columns, + String dpDir = getDynPartDirectory(row, dpColNames, numDynParts); + + if (dpDir != null) { + dpDir = appendListBucketingDirName(lbDirName, dpDir); + FSPaths fsp2 = valToPaths.get(dpDir); + + if (fsp2 == null) { + // check # of dp + if (valToPaths.size() > maxPartitions) { + // throw fatal error + incrCounter(fatalErrorCntr, 1); + fatalError = true; + LOG.error("Fatal error was thrown due to exceeding number of dynamic partitions"); + } + fsp2 = createNewPaths(dpDir); + } + fp = fsp2; + } else { + fp = fsp; + } + return fp; + } + + /** + * Append list bucketing dir name to original dir name. + * Skewed columns cannot be partitioned columns. + * @param lbDirName + * @param dpDir + * @return + */ + private String appendListBucketingDirName(String lbDirName, String dpDir) { + StringBuilder builder = new StringBuilder(dpDir); + dpDir = (lbDirName == null) ? dpDir : builder.append(Path.SEPARATOR).append(lbDirName) + .toString(); + return dpDir; + } + + // given the current input row, the mapping for input col info to dp columns, and # of dp cols, + // return the relative path corresponding to the row. + // e.g., ds=2008-04-08/hr=11 + private String getDynPartDirectory(List row, List dpColNames, int numDynParts) { + assert row.size() == numDynParts && numDynParts == dpColNames.size() : "data length is different from num of DP columns"; + return FileUtils.makePartName(dpColNames, row); + } + + @Override + protected void fatalErrorMessage(StringBuilder errMsg, long counterCode) { + errMsg.append("Operator ").append(getOperatorId()).append(" (id=").append(id).append("): "); + errMsg.append(counterCode > FATAL_ERR_MSG.length - 1 ? + "fatal error." : + FATAL_ERR_MSG[(int) counterCode]); + // number of partitions exceeds limit, list all the partition names + if (counterCode > 0) { + errMsg.append(lsDir()); + } + } + + // sample the partitions that are generated so that users have a sense of what's causing the error + private String lsDir() { + String specPath = conf.getDirName(); + // need to get a JobConf here because it's not passed through at client side + JobConf jobConf = new JobConf(ExecDriver.class); + Path tmpPath = Utilities.toTempPath(specPath); + StringBuilder sb = new StringBuilder("\n"); + try { + DynamicPartitionCtx dpCtx = conf.getDynPartCtx(); + int numDP = dpCtx.getNumDPCols(); + FileSystem fs = tmpPath.getFileSystem(jobConf); + int level = numDP; + if (conf.isLinkedFileSink()) { + level++; + } + FileStatus[] status = Utilities.getFileStatusRecurse(tmpPath, level, fs); + sb.append("Sample of ") + .append(Math.min(status.length, 100)) + .append(" partitions created under ") + .append(tmpPath.toString()) + .append(":\n"); + for (int i = 0; i < status.length; ++i) { + sb.append("\t.../"); + sb.append(getPartitionSpec(status[i].getPath(), numDP)) + .append("\n"); + } + sb.append("...\n"); + } catch (Exception e) { + // cannot get the subdirectories, just return the root directory + sb.append(tmpPath).append("...\n").append(e.getMessage()); + e.printStackTrace(); + } finally { + return sb.toString(); + } + } + + private String getPartitionSpec(Path path, int level) { + Stack st = new Stack(); + Path p = path; + for (int i = 0; i < level; ++i) { + st.push(p.getName()); + p = p.getParent(); + } + StringBuilder sb = new StringBuilder(); + while (!st.empty()) { + sb.append(st.pop()); + } + return sb.toString(); + } + + @Override + public void closeOp(boolean abort) throws HiveException { + if (!bDynParts && !filesCreated) { + createBucketFiles(fsp); + } + + lastProgressReport = System.currentTimeMillis(); + if (!abort) { + for (FSPaths fsp : valToPaths.values()) { + fsp.closeWriters(abort); + if (isNativeTable) { + fsp.commit(fs); + } + } + // Only publish stats if this operator's flag was set to gather stats + if (conf.isGatherStats()) { + publishStats(); + } + } else { + // Will come here if an Exception was thrown in map() or reduce(). + // Hadoop always call close() even if an Exception was thrown in map() or + // reduce(). + for (FSPaths fsp : valToPaths.values()) { + fsp.abortWriters(fs, abort, !autoDelete && isNativeTable); + } + } + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return getOperatorName(); + } + + static public String getOperatorName() { + return "FS"; + } + + @Override + public void jobCloseOp(Configuration hconf, boolean success, JobCloseFeedBack feedBack) + throws HiveException { + try { + if ((conf != null) && isNativeTable) { + String specPath = conf.getDirName(); + DynamicPartitionCtx dpCtx = conf.getDynPartCtx(); + if (conf.isLinkedFileSink() && (dpCtx != null)) { + specPath = conf.getParentDir(); + } + Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf, + reporter); + } + } catch (IOException e) { + throw new HiveException(e); + } + super.jobCloseOp(hconf, success, feedBack); + } + + @Override + public OperatorType getType() { + return OperatorType.FILESINK; + } + + @Override + public void augmentPlan() { + PlanUtils.configureOutputJobPropertiesForStorageHandler( + getConf().getTableInfo()); + } + + public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { + if (hiveOutputFormat == null) { + try { + hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance(); + } catch (Exception ex) { + throw new IOException(ex); + } + } + Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), job); + + if (conf.getTableInfo().isNonNative()) { + //check the ouput specs only if it is a storage handler (native tables's outputformats does + //not set the job's output properties correctly) + try { + hiveOutputFormat.checkOutputSpecs(ignored, job); + } catch (NoSuchMethodError e) { + //For BC, ignore this for now, but leave a log message + LOG.warn("HiveOutputFormat should implement checkOutputSpecs() method`"); + } + } + } + + private void publishStats() throws HiveException { + boolean isStatsReliable = conf.isStatsReliable(); + + // Initializing a stats publisher + StatsPublisher statsPublisher = Utilities.getStatsPublisher(jc); + + if (statsPublisher == null) { + // just return, stats gathering should not block the main query + LOG.error("StatsPublishing error: StatsPublisher is not initialized."); + if (isStatsReliable) { + throw new HiveException(ErrorMsg.STATSPUBLISHER_NOT_OBTAINED.getErrorCodedMsg()); + } + return; + } + + if (!statsPublisher.connect(hconf)) { + // just return, stats gathering should not block the main query + LOG.error("StatsPublishing error: cannot connect to database"); + if (isStatsReliable) { + throw new HiveException(ErrorMsg.STATSPUBLISHER_CONNECTION_ERROR.getErrorCodedMsg()); + } + return; + } + + String taskID = Utilities.getTaskIdFromFilename(Utilities.getTaskId(hconf)); + String spSpec = conf.getStaticSpec() != null ? conf.getStaticSpec() : ""; + + for (String fspKey : valToPaths.keySet()) { + FSPaths fspValue = valToPaths.get(fspKey); + String key; + + // construct the key(fileID) to insert into the intermediate stats table + if (fspKey == "") { + // for non-partitioned/static partitioned table, the key for temp storage is + // common key prefix + static partition spec + taskID + String keyPrefix = Utilities.getHashedStatsPrefix( + conf.getStatsAggPrefix() + spSpec, conf.getMaxStatsKeyPrefixLength()); + key = keyPrefix + taskID; + } else { + // for partitioned table, the key is + // common key prefix + static partition spec + DynamicPartSpec + taskID + key = createKeyForStatsPublisher(taskID, spSpec, fspKey); + } + Map statsToPublish = new HashMap(); + for (String statType : fspValue.stat.getStoredStats()) { + statsToPublish.put(statType, Long.toString(fspValue.stat.getStat(statType))); + } + if (!statsPublisher.publishStat(key, statsToPublish)) { + // The original exception is lost. + // Not changing the interface to maintain backward compatibility + if (isStatsReliable) { + throw new HiveException(ErrorMsg.STATSPUBLISHER_PUBLISHING_ERROR.getErrorCodedMsg()); + } + } + } + if (!statsPublisher.closeConnection()) { + // The original exception is lost. + // Not changing the interface to maintain backward compatibility + if (isStatsReliable) { + throw new HiveException(ErrorMsg.STATSPUBLISHER_CLOSING_ERROR.getErrorCodedMsg()); + } + } + } + + /** + * This is server side code to create key in order to save statistics to stats database. + * Client side will read it via StatsTask.java aggregateStats(). + * Client side reads it via db query prefix which is based on partition spec. + * Since store-as-subdir information is not part of partition spec, we have to + * remove store-as-subdir information from variable "keyPrefix" calculation. + * But we have to keep store-as-subdir information in variable "key" calculation + * since each skewed value has a row in stats db and "key" is db key, + * otherwise later value overwrites previous value. + * Performance impact due to string handling is minimum since this method is + * only called once in FileSinkOperator closeOp(). + * For example, + * create table test skewed by (key, value) on (('484','val_484') stored as DIRECTORIES; + * skewedValueDirList contains 2 elements: + * 1. key=484/value=val_484 + * 2. HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME + * Case #1: Static partition with store-as-sub-dir + * spSpec has SP path + * fspKey has either + * key=484/value=val_484 or + * HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME + * After filter, fspKey is empty, storedAsDirPostFix has either + * key=484/value=val_484 or + * HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME + * so, at the end, "keyPrefix" doesnt have subdir information but "key" has + * Case #2: Dynamic partition with store-as-sub-dir. Assume dp part is hr + * spSpec has SP path + * fspKey has either + * hr=11/key=484/value=val_484 or + * hr=11/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME + * After filter, fspKey is hr=11, storedAsDirPostFix has either + * key=484/value=val_484 or + * HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME + * so, at the end, "keyPrefix" doesn't have subdir information from skewed but "key" has + * @param taskID + * @param spSpec + * @param fspKey + * @return + */ + private String createKeyForStatsPublisher(String taskID, String spSpec, String fspKey) { + String key; + String newFspKey = fspKey; + String storedAsDirPostFix = ""; + if (isSkewedStoredAsSubDirectories) { + List skewedValueDirList = this.lbCtx.getSkewedValuesDirNames(); + for (String dir : skewedValueDirList) { + newFspKey = newFspKey.replace(dir, ""); + if (!newFspKey.equals(fspKey)) { + storedAsDirPostFix = dir; + break; + } + } + } + String keyPrefix = Utilities.getHashedStatsPrefix( + conf.getStatsAggPrefix() + spSpec + newFspKey + Path.SEPARATOR, + conf.getMaxStatsKeyPrefixLength()); + key = keyPrefix + storedAsDirPostFix + taskID; + return key; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java new file mode 100644 index 0000000..02d7bb9 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java @@ -0,0 +1,818 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * Map operator. This triggers overall map side processing. This is a little + * different from regular operators in that it starts off by processing a + * Writable data structure from a Table (instead of a Hive Object). + **/ +public class VectorMapOperator extends Operator implements Serializable, Cloneable { + + private static final long serialVersionUID = 1L; + + /** + * Counter. + * + */ + public static enum Counter { + DESERIALIZE_ERRORS + } + + private final transient LongWritable deserialize_error_count = new LongWritable(); + private transient Deserializer deserializer; + + private transient Object[] rowWithPart; + private transient Writable[] vcValues; + private transient List vcs; + private transient Object[] rowWithPartAndVC; + private transient StructObjectInspector tblRowObjectInspector; + // convert from partition to table schema + private transient Converter partTblObjectInspectorConverter; + private transient boolean isPartitioned; + private Map opCtxMap; + private final Set listInputPaths = new HashSet(); + + private Map, ArrayList> operatorToPaths; + + private final Map, MapOpCtx> childrenOpToOpCtxMap = + new HashMap, MapOpCtx>(); + + private ArrayList> extraChildrenToClose = null; + + private static class MapInputPath { + String path; + String alias; + Operator op; + + /** + * @param path + * @param alias + * @param op + */ + public MapInputPath(String path, String alias, + Operator op) { + this.path = path; + this.alias = alias; + this.op = op; + } + + @Override + public boolean equals(Object o) { + if (o instanceof MapInputPath) { + MapInputPath mObj = (MapInputPath) o; + return path.equals(mObj.path) && alias.equals(mObj.alias) + && op.equals(mObj.op); + } + + return false; + } + + @Override + public int hashCode() { + int ret = (path == null) ? 0 : path.hashCode(); + ret += (alias == null) ? 0 : alias.hashCode(); + ret += (op == null) ? 0 : op.hashCode(); + return ret; + } + + public Operator getOp() { + return op; + } + + public void setOp(Operator op) { + this.op = op; + } + + } + + private static class MapOpCtx { + private final boolean isPartitioned; + private final StructObjectInspector tblRawRowObjectInspector; // without partition + private final StructObjectInspector partObjectInspector; // partition + private StructObjectInspector rowObjectInspector; + private final Converter partTblObjectInspectorConverter; + private final Object[] rowWithPart; + private Object[] rowWithPartAndVC; + private final Deserializer deserializer; + private String tableName; + private String partName; + + /** + * @param isPartitioned + * @param rowObjectInspector + * @param rowWithPart + */ + public MapOpCtx(boolean isPartitioned, + StructObjectInspector rowObjectInspector, + StructObjectInspector tblRawRowObjectInspector, + StructObjectInspector partObjectInspector, + Object[] rowWithPart, + Object[] rowWithPartAndVC, + Deserializer deserializer, + Converter partTblObjectInspectorConverter) { + this.isPartitioned = isPartitioned; + this.rowObjectInspector = rowObjectInspector; + this.tblRawRowObjectInspector = tblRawRowObjectInspector; + this.partObjectInspector = partObjectInspector; + this.rowWithPart = rowWithPart; + this.rowWithPartAndVC = rowWithPartAndVC; + this.deserializer = deserializer; + this.partTblObjectInspectorConverter = partTblObjectInspectorConverter; + } + + /** + * @return the isPartitioned + */ + public boolean isPartitioned() { + return isPartitioned; + } + + /** + * @return the rowObjectInspector + */ + public StructObjectInspector getRowObjectInspector() { + return rowObjectInspector; + } + + public StructObjectInspector getTblRawRowObjectInspector() { + return tblRawRowObjectInspector; + } + + /** + * @return the rowWithPart + */ + public Object[] getRowWithPart() { + return rowWithPart; + } + + /** + * @return the rowWithPartAndVC + */ + public Object[] getRowWithPartAndVC() { + return rowWithPartAndVC; + } + + /** + * @return the deserializer + */ + public Deserializer getDeserializer() { + return deserializer; + } + + public Converter getPartTblObjectInspectorConverter() { + return partTblObjectInspectorConverter; + } + } + + /** + * Initializes this map op as the root of the tree. It sets JobConf & + * MapRedWork and starts initialization of the operator tree rooted at this + * op. + * + * @param hconf + * @param mrwork + * @throws HiveException + */ + public void initializeAsRoot(Configuration hconf, MapredWork mrwork) + throws HiveException { + setConf(mrwork); + setChildren(hconf); + initialize(hconf, null); + } + + private MapOpCtx initObjectInspector(MapredWork conf, + Configuration hconf, String onefile, Map convertedOI) + throws HiveException, + ClassNotFoundException, InstantiationException, IllegalAccessException, + SerDeException { + PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile); + LinkedHashMap partSpec = pd.getPartSpec(); + // Use tblProps in case of unpartitioned tables + Properties partProps = + (pd.getPartSpec() == null || pd.getPartSpec().isEmpty()) ? + pd.getTableDesc().getProperties() : pd.getProperties(); + + Class serdeclass = pd.getDeserializerClass(); + if (serdeclass == null) { + String className = pd.getSerdeClassName(); + if ((className == null) || (className.isEmpty())) { + throw new HiveException( + "SerDe class or the SerDe class name is not set for table: " + + pd.getProperties().getProperty("name")); + } + serdeclass = hconf.getClassByName(className); + } + + String tableName = String.valueOf(partProps.getProperty("name")); + String partName = String.valueOf(partSpec); + Deserializer partDeserializer = (Deserializer) serdeclass.newInstance(); + partDeserializer.initialize(hconf, partProps); + StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) partDeserializer + .getObjectInspector(); + + StructObjectInspector tblRawRowObjectInspector = convertedOI.get(pd.getTableDesc()); + + partTblObjectInspectorConverter = + ObjectInspectorConverters.getConverter(partRawRowObjectInspector, + tblRawRowObjectInspector); + + MapOpCtx opCtx = null; + // Next check if this table has partitions and if so + // get the list of partition names as well as allocate + // the serdes for the partition columns + String pcols = partProps + .getProperty(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); + // Log LOG = LogFactory.getLog(MapOperator.class.getName()); + if (pcols != null && pcols.length() > 0) { + String[] partKeys = pcols.trim().split("/"); + List partNames = new ArrayList(partKeys.length); + Object[] partValues = new Object[partKeys.length]; + List partObjectInspectors = new ArrayList( + partKeys.length); + for (int i = 0; i < partKeys.length; i++) { + String key = partKeys[i]; + partNames.add(key); + // Partitions do not exist for this table + if (partSpec == null) { + // for partitionless table, initialize partValue to null + partValues[i] = null; + } else { + partValues[i] = new Text(partSpec.get(key)); + } + partObjectInspectors + .add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); + } + StructObjectInspector partObjectInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(partNames, partObjectInspectors); + + Object[] rowWithPart = new Object[2]; + rowWithPart[1] = partValues; + StructObjectInspector rowObjectInspector = ObjectInspectorFactory + .getUnionStructObjectInspector(Arrays + .asList(new StructObjectInspector[] {tblRawRowObjectInspector, partObjectInspector})); + // LOG.info("dump " + tableName + " " + partName + " " + + // rowObjectInspector.getTypeName()); + opCtx = new MapOpCtx(true, rowObjectInspector, tblRawRowObjectInspector, partObjectInspector, + rowWithPart, null, partDeserializer, partTblObjectInspectorConverter); + } else { + // LOG.info("dump2 " + tableName + " " + partName + " " + + // rowObjectInspector.getTypeName()); + opCtx = new MapOpCtx(false, tblRawRowObjectInspector, tblRawRowObjectInspector, null, null, + null, partDeserializer, partTblObjectInspectorConverter); + } + opCtx.tableName = tableName; + opCtx.partName = partName; + return opCtx; + } + + /** + * Set the inspectors given a input. Since a mapper can span multiple partitions, the inspectors + * need to be changed if the input changes + **/ + private void setInspectorInput(MapInputPath inp) { + Operator op = inp.getOp(); + + deserializer = opCtxMap.get(inp).getDeserializer(); + isPartitioned = opCtxMap.get(inp).isPartitioned(); + rowWithPart = opCtxMap.get(inp).getRowWithPart(); + rowWithPartAndVC = opCtxMap.get(inp).getRowWithPartAndVC(); + tblRowObjectInspector = opCtxMap.get(inp).getRowObjectInspector(); + partTblObjectInspectorConverter = opCtxMap.get(inp).getPartTblObjectInspectorConverter(); + if (listInputPaths.contains(inp)) { + return; + } + + listInputPaths.add(inp); + + // The op may not be a TableScan for mapjoins + // Consider the query: select /*+MAPJOIN(a)*/ count(*) FROM T1 a JOIN T2 b ON a.key = b.key; + // In that case, it will be a Select, but the rowOI need not be ammended + if (op instanceof TableScanOperator) { + StructObjectInspector tblRawRowObjectInspector = + opCtxMap.get(inp).getTblRawRowObjectInspector(); + StructObjectInspector partObjectInspector = opCtxMap.get(inp).partObjectInspector; + TableScanOperator tsOp = (TableScanOperator) op; + TableScanDesc tsDesc = tsOp.getConf(); + if (tsDesc != null) { + this.vcs = tsDesc.getVirtualCols(); + if (vcs != null && vcs.size() > 0) { + List vcNames = new ArrayList(vcs.size()); + this.vcValues = new Writable[vcs.size()]; + List vcsObjectInspectors = new ArrayList(vcs.size()); + for (int i = 0; i < vcs.size(); i++) { + VirtualColumn vc = vcs.get(i); + vcsObjectInspectors.add( + PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( + ((PrimitiveTypeInfo) vc.getTypeInfo()).getPrimitiveCategory())); + vcNames.add(vc.getName()); + } + StructObjectInspector vcStructObjectInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(vcNames, + vcsObjectInspectors); + if (isPartitioned) { + this.rowWithPartAndVC = new Object[3]; + this.rowWithPartAndVC[1] = this.rowWithPart[1]; + } else { + this.rowWithPartAndVC = new Object[2]; + } + if (partObjectInspector == null) { + this.tblRowObjectInspector = ObjectInspectorFactory.getUnionStructObjectInspector(Arrays + .asList(new StructObjectInspector[] { + tblRowObjectInspector, vcStructObjectInspector})); + } else { + this.tblRowObjectInspector = ObjectInspectorFactory.getUnionStructObjectInspector(Arrays + .asList(new StructObjectInspector[] { + tblRawRowObjectInspector, partObjectInspector, + vcStructObjectInspector})); + } + opCtxMap.get(inp).rowObjectInspector = this.tblRowObjectInspector; + opCtxMap.get(inp).rowWithPartAndVC = this.rowWithPartAndVC; + } + } + } + } + + // Return the mapping for table descriptor to the expected table OI + /** + * Traverse all the partitions for a table, and get the OI for the table. + * Note that a conversion is required if any of the partition OI is different + * from the table OI. For eg. if the query references table T (partitions P1, P2), + * and P1's schema is same as T, whereas P2's scheme is different from T, conversion + * might be needed for both P1 and P2, since SettableOI might be needed for T + */ + private Map getConvertedOI(Configuration hconf) + throws HiveException { + Map tableDescOI = + new HashMap(); + Set identityConverterTableDesc = new HashSet(); + try + { + for (String onefile : conf.getPathToAliases().keySet()) { + PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile); + TableDesc tableDesc = pd.getTableDesc(); + Properties tblProps = tableDesc.getProperties(); + // If the partition does not exist, use table properties + Properties partProps = + (pd.getPartSpec() == null || pd.getPartSpec().isEmpty()) ? + tblProps : pd.getProperties(); + + Class sdclass = pd.getDeserializerClass(); + if (sdclass == null) { + String className = pd.getSerdeClassName(); + if ((className == null) || (className.isEmpty())) { + throw new HiveException( + "SerDe class or the SerDe class name is not set for table: " + + pd.getProperties().getProperty("name")); + } + sdclass = hconf.getClassByName(className); + } + + Deserializer partDeserializer = (Deserializer) sdclass.newInstance(); + partDeserializer.initialize(hconf, partProps); + StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) partDeserializer + .getObjectInspector(); + + StructObjectInspector tblRawRowObjectInspector = tableDescOI.get(tableDesc); + if ((tblRawRowObjectInspector == null) || + (identityConverterTableDesc.contains(tableDesc))) { + sdclass = tableDesc.getDeserializerClass(); + if (sdclass == null) { + String className = tableDesc.getSerdeClassName(); + if ((className == null) || (className.isEmpty())) { + throw new HiveException( + "SerDe class or the SerDe class name is not set for table: " + + tableDesc.getProperties().getProperty("name")); + } + sdclass = hconf.getClassByName(className); + } + Deserializer tblDeserializer = (Deserializer) sdclass.newInstance(); + tblDeserializer.initialize(hconf, tblProps); + tblRawRowObjectInspector = + (StructObjectInspector) ObjectInspectorConverters.getConvertedOI( + partRawRowObjectInspector, + (StructObjectInspector) tblDeserializer.getObjectInspector()); + + if (identityConverterTableDesc.contains(tableDesc)) { + if (!partRawRowObjectInspector.equals(tblRawRowObjectInspector)) { + identityConverterTableDesc.remove(tableDesc); + } + } + else if (partRawRowObjectInspector.equals(tblRawRowObjectInspector)) { + identityConverterTableDesc.add(tableDesc); + } + + tableDescOI.put(tableDesc, tblRawRowObjectInspector); + } + } + } catch (Exception e) { + throw new HiveException(e); + } + return tableDescOI; + } + + public void setChildren(Configuration hconf) throws HiveException { + + Path fpath = new Path((new Path(HiveConf.getVar(hconf, + HiveConf.ConfVars.HADOOPMAPFILENAME))).toUri().getPath()); + + ArrayList> children = + new ArrayList>(); + opCtxMap = new HashMap(); + operatorToPaths = new HashMap, ArrayList>(); + + statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count); + Map convertedOI = getConvertedOI(hconf); + try { + for (String onefile : conf.getPathToAliases().keySet()) { + MapOpCtx opCtx = initObjectInspector(conf, hconf, onefile, convertedOI); + //Create columnMap + Map columnMap = new HashMap(); + StructObjectInspector rowInspector = opCtx.getRowObjectInspector(); + + int columnCount = 0; + for (StructField sfield : rowInspector.getAllStructFieldRefs()) { + columnMap.put(sfield.getFieldName(), columnCount); + System.out.println("Column Name: " + sfield.getFieldName() + ", " + + "column index: " + columnCount); + LOG.info("Column Name: " + sfield.getFieldName() + ", " + + "column index: " + columnCount); + columnCount++; + } + + Path onepath = new Path(new Path(onefile).toUri().getPath()); + List aliases = conf.getPathToAliases().get(onefile); + + VectorizationContext vectorizationContext = new VectorizationContext + (columnMap, + columnCount); + + for (String onealias : aliases) { + Operator op = conf.getAliasToWork().get( + onealias); + LOG.info("Adding alias " + onealias + " to work list for file " + + onefile); + + Operator vectorOp = vectorizeOperator(op, + vectorizationContext); + + System.out.println("Using vectorized op: "+ vectorOp.getName()); + LOG.info("Using vectorized op: " + vectorOp.getName()); + op = vectorOp; + MapInputPath inp = new MapInputPath(onefile, onealias, op); + opCtxMap.put(inp, opCtx); + if (operatorToPaths.get(op) == null) { + operatorToPaths.put(op, new ArrayList()); + } + operatorToPaths.get(op).add(onefile); + op.setParentOperators(new ArrayList>()); + op.getParentOperators().add(this); + // check for the operators who will process rows coming to this Map + // Operator + if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) { + children.add(op); + childrenOpToOpCtxMap.put(op, opCtx); + LOG.info("dump " + op.getName() + " " + + opCtxMap.get(inp).getRowObjectInspector().getTypeName()); + } + setInspectorInput(inp); + } + } + + if (children.size() == 0) { + // didn't find match for input file path in configuration! + // serious problem .. + LOG.error("Configuration does not have any alias for path: " + + fpath.toUri().getPath()); + throw new HiveException("Configuration and input path are inconsistent"); + } + + // we found all the operators that we are supposed to process. + setChildOperators(children); + } catch (Exception e) { + throw new HiveException(e); + } + } + + private Operator vectorizeOperator + (Operator op, VectorizationContext + vectorizationContext) throws HiveException { + + Operator vectorOp; + boolean recursive = true; + + switch (op.getType()) { + case GROUPBY: + vectorOp = new VectorGroupByOperator(vectorizationContext, op.getConf()); + recursive = false; + break; + case FILTER: + vectorOp = new VectorFilterOperator(vectorizationContext, op.getConf()); + break; + case SELECT: + vectorOp = new VectorSelectOperator(vectorizationContext, op.getConf()); + break; + case FILESINK: + vectorOp = new VectorFileSinkOperator(vectorizationContext, + op.getConf()); + break; + case TABLESCAN: + vectorOp = op; + break; + default: + throw new HiveException("Operator: " + op.getName() + ", " + + "not vectorized"); + } + + if (recursive) { + List> children = op.getChildOperators(); + if (children != null && !children.isEmpty()) { + List> vectorizedChildren = new + ArrayList>(children.size()); + for (Operator childOp : children) { + Operator vectorizedChild = this + .vectorizeOperator(childOp, vectorizationContext); + List> parentList = + new ArrayList>(); + parentList.add(vectorOp); + vectorizedChild.setParentOperators(parentList); + vectorizedChildren.add(vectorizedChild); + } + vectorOp.setChildOperators(vectorizedChildren); + } + } else { + // transfer the row-mode clients to the vectorized op parent + List> children = op.getChildOperators(); + if (children != null && !children.isEmpty()) { + List> parentList = + new ArrayList>(); + parentList.add(vectorOp); + for (Operator childOp : children) { + childOp.setParentOperators(parentList); + } + vectorOp.setChildOperators(children); + } + } + + return vectorOp; + } + + private void plugIntermediate (Operator parent, + Operator plug) { + + List> plugList = + new ArrayList>(); + plugList.add(plug); + + List> parentAsList = + new ArrayList>(); + parentAsList.add(parent); + + + List> children = parent.getChildOperators(); + if (children != null && !children.isEmpty()) { + for (Operator childOp : children) { + childOp.setParentOperators(plugList); + } + } + plug.setChildOperators(children); + plug.setParentOperators(parentAsList); + parent.setChildOperators (plugList); + } + + @Override + public void initializeOp(Configuration hconf) throws HiveException { + // set that parent initialization is done and call initialize on children + state = State.INIT; + List> children = getChildOperators(); + + for (Entry, MapOpCtx> entry : childrenOpToOpCtxMap + .entrySet()) { + Operator child = entry.getKey(); + MapOpCtx mapOpCtx = entry.getValue(); + // Add alias, table name, and partitions to hadoop conf so that their + // children will + // inherit these + HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, + mapOpCtx.tableName); + HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, + mapOpCtx.partName); + child.initialize(hconf, new ObjectInspector[] {mapOpCtx.getRowObjectInspector()}); + } + + for (Entry entry : opCtxMap.entrySet()) { + // Add alias, table name, and partitions to hadoop conf so that their + // children will + // inherit these + HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, + entry.getValue().tableName); + HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, entry + .getValue().partName); + MapInputPath input = entry.getKey(); + Operator op = input.op; + // op is not in the children list, so need to remember it and close it + // afterwards + if (children.indexOf(op) == -1) { + if (extraChildrenToClose == null) { + extraChildrenToClose = new ArrayList>(); + } + extraChildrenToClose.add(op); + op.initialize(hconf, new ObjectInspector[] {entry.getValue().getRowObjectInspector()}); + } + } + } + + /** + * close extra child operators that are initialized but are not executed. + */ + @Override + public void closeOp(boolean abort) throws HiveException { + if (extraChildrenToClose != null) { + for (Operator op : extraChildrenToClose) { + op.close(abort); + } + } + } + + // Change the serializer etc. since it is a new file, and split can span + // multiple files/partitions. + @Override + public void cleanUpInputFileChangedOp() throws HiveException { + Path fpath = new Path((new Path(this.getExecContext().getCurrentInputFile())) + .toUri().getPath()); + + for (String onefile : conf.getPathToAliases().keySet()) { + Path onepath = new Path(new Path(onefile).toUri().getPath()); + // check for the operators who will process rows coming to this Map + // Operator + if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) { + String onealias = conf.getPathToAliases().get(onefile).get(0); + Operator op = + conf.getAliasToWork().get(onealias); + + LOG.info("Processing alias " + onealias + " for file " + onefile); + + MapInputPath inp = new MapInputPath(onefile, onealias, op); + setInspectorInput(inp); + break; + } + } + } + + public static Writable[] populateVirtualColumnValues(ExecMapperContext ctx, + List vcs, Writable[] vcValues, Deserializer deserializer) { + if (vcs == null) { + return vcValues; + } + if (vcValues == null) { + vcValues = new Writable[vcs.size()]; + } + for (int i = 0; i < vcs.size(); i++) { + VirtualColumn vc = vcs.get(i); + if (vc.equals(VirtualColumn.FILENAME)) { + if (ctx.inputFileChanged()) { + vcValues[i] = new Text(ctx.getCurrentInputFile()); + } + } else if (vc.equals(VirtualColumn.BLOCKOFFSET)) { + long current = ctx.getIoCxt().getCurrentBlockStart(); + LongWritable old = (LongWritable) vcValues[i]; + if (old == null) { + old = new LongWritable(current); + vcValues[i] = old; + continue; + } + if (current != old.get()) { + old.set(current); + } + } else if (vc.equals(VirtualColumn.ROWOFFSET)) { + long current = ctx.getIoCxt().getCurrentRow(); + LongWritable old = (LongWritable) vcValues[i]; + if (old == null) { + old = new LongWritable(current); + vcValues[i] = old; + continue; + } + if (current != old.get()) { + old.set(current); + } + } else if (vc.equals(VirtualColumn.RAWDATASIZE)) { + long current = 0L; + SerDeStats stats = deserializer.getSerDeStats(); + if(stats != null) { + current = stats.getRawDataSize(); + } + LongWritable old = (LongWritable) vcValues[i]; + if (old == null) { + old = new LongWritable(current); + vcValues[i] = old; + continue; + } + if (current != old.get()) { + old.set(current); + } + } + } + return vcValues; + } + + public void process(Object value) throws HiveException { + // A mapper can span multiple files/partitions. + // The serializers need to be reset if the input file changed + if ((this.getExecContext() != null) && + this.getExecContext().inputFileChanged()) { + // The child operators cleanup if input file has changed + cleanUpInputFileChanged(); + } + + // The row has been converted to comply with table schema, irrespective of partition schema. + // So, use tblOI (and not partOI) for forwarding + try { + if (value instanceof VectorizedRowBatch) { + forward(value, null); + } else { + Object row = null; + row = this.partTblObjectInspectorConverter.convert(deserializer.deserialize((Writable) value)); + forward(row, tblRowObjectInspector); + } + } catch (Exception e) { + throw new HiveException("Hive Runtime Error while processing ", e); + } + } + + @Override + public void processOp(Object row, int tag) throws HiveException { + throw new HiveException("Hive 2 Internal error: should not be called!"); + } + + @Override + public String getName() { + return getOperatorName(); + } + + static public String getOperatorName() { + return "MAP"; + } + + @Override + public OperatorType getType() { + return null; + } + +}