diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java index ddaaa21..9af5768 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -33,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.metastore.api.SkewedValueList; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.ExecDriver; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; @@ -54,6 +56,7 @@ import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.SkewedColumnPositionPair; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.ql.stats.StatsSetupConst; @@ -597,7 +600,7 @@ public void processOp(Object data, int tag) throws HiveException { } /* Create list bucketing sub-directory only if stored-as-directories is on. */ String lbDirName = null; - //lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row); + lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row); FSPaths fpaths; @@ -682,6 +685,52 @@ public void processOp(Object data, int tag) throws HiveException { } /** + * Generate list bucketing directory name from a row. + * @param row row to process. + * @return directory name. + */ + private String generateListBucketingDirName(Object row) { + if (!this.isSkewedStoredAsSubDirectories) { + return null; + } + + String lbDirName = null; + List standObjs = new ArrayList(); + List skewedCols = lbCtx.getSkewedColNames(); + List> allSkewedVals = lbCtx.getSkewedColValues(); + List skewedValsCandidate = null; + Map locationMap = lbCtx.getLbLocationMap(); + + /* Convert input row to standard objects. */ + ObjectInspectorUtils.copyToStandardObject(standObjs, row, + (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE); + + assert (standObjs.size() >= skewedCols.size()) : + "The row has less number of columns than no. of skewed column."; + + skewedValsCandidate = new ArrayList(skewedCols.size()); + for (SkewedColumnPositionPair posPair : lbCtx.getRowSkewedIndex()) { + skewedValsCandidate.add(posPair.getSkewColPosition(), + standObjs.get(posPair.getTblColPosition()).toString()); + } + /* The row matches skewed column names. */ + if (allSkewedVals.contains(skewedValsCandidate)) { + /* matches skewed values. */ + lbDirName = FileUtils.makeListBucketingDirName(skewedCols, skewedValsCandidate); + locationMap.put(new SkewedValueList(skewedValsCandidate), lbDirName); + } else { + /* create default directory. */ + lbDirName = FileUtils.makeDefaultListBucketingDirName(skewedCols, + lbCtx.getDefaultDirName()); + List defaultKey = Arrays.asList(lbCtx.getDefaultKey()); + if (!locationMap.containsKey(defaultKey)) { + locationMap.put(new SkewedValueList(defaultKey), lbDirName); + } + } + return lbDirName; + } + + /** * Lookup list bucketing path. * @param lbDirName * @return diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java index adbf45a..3f6a121 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java @@ -412,6 +412,9 @@ public void setChildren(Configuration hconf) throws HiveException { statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count); Map convertedOI = getConvertedOI(hconf); + Map> aliasToVectorOpMap = + new HashMap>(); + try { for (String onefile : conf.getPathToAliases().keySet()) { MapOpCtx opCtx = initObjectInspector(conf, hconf, onefile, convertedOI); @@ -440,8 +443,12 @@ public void setChildren(Configuration hconf) throws HiveException { LOG.info("Adding alias " + onealias + " to work list for file " + onefile); - Operator vectorOp = vectorizeOperator(op, - vectorizationContext); + Operator vectorOp = aliasToVectorOpMap.get(onealias); + + if (vectorOp == null) { + vectorOp = vectorizeOperator(op, vectorizationContext); + aliasToVectorOpMap.put(onealias, vectorOp); + } System.out.println("Using vectorized op: "+ vectorOp.getName()); LOG.info("Using vectorized op: " + vectorOp.getName()); @@ -499,8 +506,7 @@ public void setChildren(Configuration hconf) throws HiveException { vectorOp = new VectorSelectOperator(vectorizationContext, op.getConf()); break; case FILESINK: - vectorOp = new VectorFileSinkOperator(vectorizationContext, - op.getConf()); + vectorOp = new VectorFileSinkOperator(vectorizationContext, op.getConf()); break; case TABLESCAN: vectorOp = op.cloneOp(); @@ -631,7 +637,25 @@ public void closeOp(boolean abort) throws HiveException { // multiple files/partitions. @Override public void cleanUpInputFileChangedOp() throws HiveException { - + Path fpath = new Path((new Path(this.getExecContext().getCurrentInputFile())) + .toUri().getPath()); + + for (String onefile : conf.getPathToAliases().keySet()) { + Path onepath = new Path(new Path(onefile).toUri().getPath()); + // check for the operators who will process rows coming to this Map + // Operator + if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) { + String onealias = conf.getPathToAliases().get(onefile).get(0); + Operator op = + conf.getAliasToWork().get(onealias); + + LOG.info("Processing alias " + onealias + " for file " + onefile); + + MapInputPath inp = new MapInputPath(onefile, onealias, op); + //setInspectorInput(inp); + break; + } + } } public static Writable[] populateVirtualColumnValues(ExecMapperContext ctx, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index 5018ea1..03bd873 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -279,7 +279,7 @@ private int GetColIndexBasedOnColName(String colName) throws HiveException { List fieldRefs = rowOI.getAllStructFieldRefs(); for (int i = 0; i < fieldRefs.size(); i++) { - if (fieldRefs.get(i).getFieldName() == colName) { + if (fieldRefs.get(i).getFieldName().equals(colName)) { return i; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java index 25b3aed..5fb4b69 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java @@ -58,6 +58,7 @@ private VectorizedRowBatchCtx rbCtx; private final LongWritable keyCache = new LongWritable(); private final BytesRefArrayWritable colsCache = new BytesRefArrayWritable(); + private boolean addPartitionCols = true; private static RCFileSyncCache syncCache = new RCFileSyncCache(); @@ -149,10 +150,6 @@ public VectorizedRowBatch createValue() { VectorizedRowBatch result = null; try { result = rbCtx.CreateVectorizedRowBatch(); - // Since the record reader works only on one split and - // given a split the partition cannot change, we are setting the partition - // values only once during batch creation - rbCtx.AddPartitionColsToBatch(result); } catch (HiveException e) { new RuntimeException("Error creating a batch", e); } @@ -175,9 +172,18 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti int i = 0; try { + for (; i < VectorizedRowBatch.DEFAULT_SIZE; i++) { more = next(keyCache); if (more) { + // Check and update partition cols if necessary. Ideally this should be done + // in CreateValue() as the partition is constant per split. But since Hive uses + // CombineHiveRecordReader and as this does not call CreateValue() for + // each new RecordReader it creates, this check is required in next() + if (addPartitionCols) { + rbCtx.AddPartitionColsToBatch(value); + addPartitionCols = false; + } in.getCurrentRow(colsCache); // Currently RCFile reader does not support reading vectorized // data. Populating the batch by adding one row at a time. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index 2c20987..211b7e5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; @@ -46,7 +45,7 @@ * A MapReduce/Hive input format for ORC files. */ public class VectorizedOrcInputFormat extends FileInputFormat - implements InputFormatChecker, VectorizedInputFormatInterface { + implements InputFormatChecker, VectorizedInputFormatInterface { private static class VectorizedOrcRecordReader implements RecordReader { @@ -55,6 +54,7 @@ private final long length; private float progress = 0.0f; private VectorizedRowBatchCtx rbCtx; + private boolean addPartitionCols = true; VectorizedOrcRecordReader(Reader file, Configuration conf, FileSplit fileSplit) throws IOException { @@ -78,10 +78,19 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti if (!reader.hasNext()) { return false; } - reader.nextBatch(value); try { - rbCtx.ConvertRowBatchBlobToVectorizedBatch((Object)value, value.size, value); - } catch (SerDeException e) { + // Check and update partition cols if necessary. Ideally, this should be done + // in CreateValue as the partition is constant per split. But since Hive uses + // CombineHiveRecordReader and + // as this does not call CreateValue for each new RecordReader it creates, this check is + // required in next() + if (addPartitionCols) { + rbCtx.AddPartitionColsToBatch(value); + addPartitionCols = false; + } + reader.nextBatch(value); + rbCtx.ConvertRowBatchBlobToVectorizedBatch((Object) value, value.size, value); + } catch (Exception e) { new RuntimeException(e); } progress = reader.getProgress(); @@ -98,10 +107,6 @@ public VectorizedRowBatch createValue() { VectorizedRowBatch result = null; try { result = rbCtx.CreateVectorizedRowBatch(); - // Since the record reader works only on one split and - // given a split the partition cannot change, we are setting the partition - // values only once during batch creation - rbCtx.AddPartitionColsToBatch(result); } catch (HiveException e) { new RuntimeException("Error creating a batch", e); } @@ -131,29 +136,36 @@ public VectorizedOrcInputFormat() { /** * Recurse down into a type subtree turning on all of the sub-columns. - * @param types the types of the file - * @param result the global view of columns that should be included - * @param typeId the root of tree to enable + * + * @param types + * the types of the file + * @param result + * the global view of columns that should be included + * @param typeId + * the root of tree to enable */ private static void includeColumnRecursive(List types, - boolean[] result, - int typeId) { + boolean[] result, + int typeId) { result[typeId] = true; OrcProto.Type type = types.get(typeId); int children = type.getSubtypesCount(); - for(int i=0; i < children; ++i) { + for (int i = 0; i < children; ++i) { includeColumnRecursive(types, result, type.getSubtypes(i)); } } /** * Take the configuration and figure out which columns we need to include. - * @param types the types of the file - * @param conf the configuration + * + * @param types + * the types of the file + * @param conf + * the configuration * @return true for each column that should be included */ private static boolean[] findIncludedColumns(List types, - Configuration conf) { + Configuration conf) { String includedStr = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR); if (includedStr == null || includedStr.trim().length() == 0) { @@ -164,13 +176,13 @@ private static void includeColumnRecursive(List types, result[0] = true; OrcProto.Type root = types.get(0); List included = ColumnProjectionUtils.getReadColumnIDs(conf); - for(int i=0; i < root.getSubtypesCount(); ++i) { + for (int i = 0; i < root.getSubtypesCount(); ++i) { if (included.contains(i)) { includeColumnRecursive(types, result, root.getSubtypes(i)); } } // if we are filtering at least one column, return the boolean array - for(boolean include: result) { + for (boolean include : result) { if (!include) { return result; } @@ -182,7 +194,7 @@ private static void includeColumnRecursive(List types, @Override public RecordReader getRecordReader(InputSplit inputSplit, JobConf conf, - Reporter reporter) throws IOException { + Reporter reporter) throws IOException { FileSplit fileSplit = (FileSplit) inputSplit; Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(conf); @@ -192,8 +204,8 @@ private static void includeColumnRecursive(List types, @Override public boolean validateInput(FileSystem fs, HiveConf conf, - ArrayList files - ) throws IOException { + ArrayList files + ) throws IOException { if (files.size() <= 0) { return false; }