diff --git accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDe.java accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDe.java index 66ab01e..3d21eb3 100644 --- accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDe.java +++ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/AccumuloSerDe.java @@ -140,7 +140,7 @@ public ObjectInspector getObjectInspector() throws SerDeException { } public SerDeStats getSerDeStats() { - throw new UnsupportedOperationException("SerdeStats not supported."); + return null; } public AccumuloSerDeParameters getParams() { diff --git contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java index 72f4234..13e5436 100644 --- contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java +++ contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java @@ -285,11 +285,4 @@ private static void serializeNoEncode(ByteStream.Output out, Object obj, } throw new RuntimeException("Unknown category type: "+ objInspector.getCategory()); } - - @Override - public SerDeStats getSerDeStats() { - // no support for statistics - return null; - } - } diff --git contrib/src/java/org/apache/hadoop/hive/contrib/serde2/RegexSerDe.java contrib/src/java/org/apache/hadoop/hive/contrib/serde2/RegexSerDe.java index aadfb51..b661e89 100644 --- contrib/src/java/org/apache/hadoop/hive/contrib/serde2/RegexSerDe.java +++ contrib/src/java/org/apache/hadoop/hive/contrib/serde2/RegexSerDe.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeSpec; -import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructField; @@ -266,11 +265,4 @@ public Writable serialize(Object obj, ObjectInspector objInspector) outputRowText.set(outputRowString); return outputRowText; } - - @Override - public SerDeStats getSerDeStats() { - // no support for statistics - return null; - } - } diff --git contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java index ea87bf6..9053119 100644 --- contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java +++ contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeSpec; -import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -383,9 +382,4 @@ private void serializeField(Object o, ObjectInspector oi, Object reuse) } } } - - public SerDeStats getSerDeStats() { - // no support for statistics - return null; - } } diff --git contrib/src/java/org/apache/hadoop/hive/contrib/serde2/s3/S3LogDeserializer.java contrib/src/java/org/apache/hadoop/hive/contrib/serde2/s3/S3LogDeserializer.java index ce445b0..3f459e9 100644 --- contrib/src/java/org/apache/hadoop/hive/contrib/serde2/s3/S3LogDeserializer.java +++ contrib/src/java/org/apache/hadoop/hive/contrib/serde2/s3/S3LogDeserializer.java @@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.AbstractDeserializer; import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; @@ -205,11 +204,4 @@ public static void main(String[] args) { } } - - @Override - public SerDeStats getSerDeStats() { - // no support for statistics - return null; - } - } diff --git hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java index 47e20d5..0afc24c 100644 --- hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java +++ hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeSpec; -import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -286,12 +285,6 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDe } } - @Override - public SerDeStats getSerDeStats() { - // no support for statistics - return null; - } - public HBaseKeyFactory getKeyFactory() { return serdeParams.getKeyFactory(); } diff --git itests/custom-serde/src/main/java/org/apache/hadoop/hive/serde2/CustomSerDe1.java itests/custom-serde/src/main/java/org/apache/hadoop/hive/serde2/CustomSerDe1.java index c28f096..6d9e636 100644 --- itests/custom-serde/src/main/java/org/apache/hadoop/hive/serde2/CustomSerDe1.java +++ itests/custom-serde/src/main/java/org/apache/hadoop/hive/serde2/CustomSerDe1.java @@ -104,11 +104,4 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { return null; } - - @Override - public SerDeStats getSerDeStats() { - // no support for statistics - return null; - } - } diff --git itests/custom-serde/src/main/java/org/apache/hadoop/hive/serde2/CustomSerDe2.java itests/custom-serde/src/main/java/org/apache/hadoop/hive/serde2/CustomSerDe2.java index 05d0590..e9162b0 100644 --- itests/custom-serde/src/main/java/org/apache/hadoop/hive/serde2/CustomSerDe2.java +++ itests/custom-serde/src/main/java/org/apache/hadoop/hive/serde2/CustomSerDe2.java @@ -104,11 +104,4 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { return null; } - - @Override - public SerDeStats getSerDeStats() { - // no support for statistics - return null; - } - } diff --git itests/test-serde/src/main/java/org/apache/hadoop/hive/serde2/TestSerDe.java itests/test-serde/src/main/java/org/apache/hadoop/hive/serde2/TestSerDe.java index 9f7a20a..2184e9d 100644 --- itests/test-serde/src/main/java/org/apache/hadoop/hive/serde2/TestSerDe.java +++ itests/test-serde/src/main/java/org/apache/hadoop/hive/serde2/TestSerDe.java @@ -200,11 +200,4 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDe serializeCache.set(sb.toString()); return serializeCache; } - - @Override - public SerDeStats getSerDeStats() { - // no support for statistics - return null; - } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 0ccab02..c7d3b23 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -44,7 +44,7 @@ import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveRecordReader; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.metadata.VirtualColumns; import org.apache.hadoop.hive.ql.parse.SplitSample; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; @@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeSpec; +import org.apache.hadoop.hive.serde2.VirtualColumn; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; @@ -95,6 +96,7 @@ private StructObjectInspector vcsOI; private List vcCols; private ExecMapperContext context; + private VirtualColumns.Builder builder; private transient Deserializer tableSerDe; private transient StructObjectInspector tableOI; @@ -141,19 +143,8 @@ public FetchOperator(FetchWork work, JobConf job, Operator operator, } private void initialize() throws HiveException { - if (isStatReader) { - outputOI = work.getStatRowOI(); - return; - } if (hasVC) { - List names = new ArrayList(vcCols.size()); - List inspectors = new ArrayList(vcCols.size()); - for (VirtualColumn vc : vcCols) { - inspectors.add(vc.getObjectInspector()); - names.add(vc.getName()); - } - vcsOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors); - vcValues = new Object[vcCols.size()]; + vcsOI = VirtualColumns.getVCSObjectInspector(vcCols); } if (hasVC && isPartitioned) { row = new Object[3]; @@ -306,6 +297,9 @@ static void setFetchOperatorContext(JobConf conf, List paths) { if (isPartitioned) { row[1] = createPartValue(currDesc, partKeyOI); } + if (hasVC) { + builder = VirtualColumns.builder(vcCols, context, currSerDe); + } iterSplits = Arrays.asList(splits).iterator(); if (LOG.isDebugEnabled()) { @@ -482,8 +476,7 @@ public InspectableObject getNextRow() throws IOException { operator.cleanUpInputFileChanged(); } if (hasVC) { - row[isPartitioned ? 2 : 1] = - MapOperator.populateVirtualColumnValues(context, vcCols, vcValues, currSerDe); + row[isPartitioned ? 2 : 1] = builder.evaluate(); } Object deserialized = currSerDe.deserialize(value); if (ObjectConverter != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index c4f04cb..a3617da 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -30,12 +30,12 @@ import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.VirtualColumn; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 4f3d504..79966d5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -156,6 +156,7 @@ public FSPaths(Path specPath) { * Update OutPath according to tmpPath. */ public Path getTaskOutPath(String taskId) { + LOG.warn("OUTPUT PATH : " + taskId + " in " + taskOutputTempPath); return new Path(this.taskOutputTempPath, Utilities.toTempPath(taskId)); } @@ -163,6 +164,7 @@ public Path getTaskOutPath(String taskId) { * Update the final paths according to tmpPath. */ public Path getFinalPath(String taskId, Path tmpPath, String extension) { + LOG.warn("FINAL PATH : " + taskId + ":" + extension + " in " + tmpPath); if (extension != null) { return new Path(tmpPath, taskId + extension); } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index 6d06e9e..d135cf2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -36,9 +36,8 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor; -import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.metadata.VirtualColumns; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; @@ -47,8 +46,8 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.VirtualColumn; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; @@ -57,7 +56,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; @@ -116,7 +114,8 @@ String tableName; String partName; List vcs; - Object[] vcValues; + + VirtualColumns.Builder builder; public MapOpCtx(String alias, Operator op, PartitionDesc partDesc) { this.alias = alias; @@ -132,17 +131,19 @@ private boolean hasVC() { return vcsObjectInspector != null; } + private void activate(ExecMapperContext context) { + if (hasVC()) { + builder = VirtualColumns.builder(vcs, context, deserializer); + } + } + private Object readRow(Writable value, ExecMapperContext context) throws SerDeException { Object deserialized = deserializer.deserialize(value); Object row = partTblObjectInspectorConverter.convert(deserialized); if (hasVC()) { rowWithPartAndVC[0] = row; - if (context != null) { - populateVirtualColumnValues(context, vcs, vcValues, deserializer); - } - int vcPos = isPartitioned() ? 2 : 1; - rowWithPartAndVC[vcPos] = vcValues; - return rowWithPartAndVC; + rowWithPartAndVC[(isPartitioned() ? 2 : 1)] = builder.evaluate(); + return rowWithPartAndVC; } else if (isPartitioned()) { rowWithPart[0] = row; return rowWithPart; @@ -250,8 +251,7 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapOpCtx opCtx, TableScanDesc tsDesc = tsOp.getConf(); if (tsDesc != null && tsDesc.hasVirtualCols()) { opCtx.vcs = tsDesc.getVirtualCols(); - opCtx.vcValues = new Object[opCtx.vcs.size()]; - opCtx.vcsObjectInspector = VirtualColumn.getVCSObjectInspector(opCtx.vcs); + opCtx.vcsObjectInspector = VirtualColumns.getVCSObjectInspector(opCtx.vcs); if (opCtx.isPartitioned()) { opCtx.rowWithPartAndVC = Arrays.copyOfRange(opCtx.rowWithPart, 0, 3); } else { @@ -438,7 +438,9 @@ public void closeOp(boolean abort) throws HiveException { @Override public void cleanUpInputFileChangedOp() throws HiveException { super.cleanUpInputFileChangedOp(); - Path fpath = getExecContext().getCurrentInputPath(); + ExecMapperContext execContext = getExecContext(); + + Path fpath = execContext.getCurrentInputPath(); String nominalPath = getNominalPath(fpath); Map, MapOpCtx> contexts = opCtxMap.get(nominalPath); if (isLogInfoEnabled) { @@ -449,16 +451,16 @@ public void cleanUpInputFileChangedOp() throws HiveException { } builder.append(context.alias); } - if (isLogDebugEnabled) { - LOG.info("Processing alias(es) " + builder.toString() + " for file " + fpath); - } + LOG.info("Processing alias(es) " + builder.toString() + " for file " + fpath); } + // Add alias, table name, and partitions to hadoop conf so that their // children will inherit these for (Entry, MapOpCtx> entry : contexts.entrySet()) { Operator operator = entry.getKey(); MapOpCtx context = entry.getValue(); operator.setInputContext(nominalPath, context.tableName, context.partName); + context.activate(execContext); } currentCtxs = contexts.values().toArray(new MapOpCtx[contexts.size()]); } @@ -530,75 +532,6 @@ private String toErrorMessage(Writable value, Object row, ObjectInspector inspec } } - public static Object[] populateVirtualColumnValues(ExecMapperContext ctx, - List vcs, Object[] vcValues, Deserializer deserializer) { - if (vcs == null) { - return vcValues; - } - if (vcValues == null) { - vcValues = new Object[vcs.size()]; - } - for (int i = 0; i < vcs.size(); i++) { - VirtualColumn vc = vcs.get(i); - if (vc.equals(VirtualColumn.FILENAME)) { - if (ctx.inputFileChanged()) { - vcValues[i] = new Text(ctx.getCurrentInputPath().toString()); - } - } else if (vc.equals(VirtualColumn.BLOCKOFFSET)) { - long current = ctx.getIoCxt().getCurrentBlockStart(); - LongWritable old = (LongWritable) vcValues[i]; - if (old == null) { - old = new LongWritable(current); - vcValues[i] = old; - continue; - } - if (current != old.get()) { - old.set(current); - } - } else if (vc.equals(VirtualColumn.ROWOFFSET)) { - long current = ctx.getIoCxt().getCurrentRow(); - LongWritable old = (LongWritable) vcValues[i]; - if (old == null) { - old = new LongWritable(current); - vcValues[i] = old; - continue; - } - if (current != old.get()) { - old.set(current); - } - } else if (vc.equals(VirtualColumn.RAWDATASIZE)) { - long current = 0L; - SerDeStats stats = deserializer.getSerDeStats(); - if(stats != null) { - current = stats.getRawDataSize(); - } - LongWritable old = (LongWritable) vcValues[i]; - if (old == null) { - old = new LongWritable(current); - vcValues[i] = old; - continue; - } - if (current != old.get()) { - old.set(current); - } - } - else if(vc.equals(VirtualColumn.ROWID)) { - if(ctx.getIoCxt().ri == null) { - vcValues[i] = null; - } - else { - if(vcValues[i] == null) { - vcValues[i] = new Object[RecordIdentifier.Field.values().length]; - } - RecordIdentifier.StructInfo.toArray(ctx.getIoCxt().ri, (Object[])vcValues[i]); - ctx.getIoCxt().ri = null;//so we don't accidentally cache the value; shouldn't - //happen since IO layer either knows how to produce ROW__ID or not - but to be safe - } - } - } - return vcValues; - } - @Override public void processOp(Object row, int tag) throws HiveException { throw new HiveException("Hive 2 Internal error: should not be called!"); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index cb010fb..a04f06a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -31,13 +31,13 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent; import org.apache.hadoop.hive.ql.stats.StatsPublisher; +import org.apache.hadoop.hive.serde2.VirtualColumn; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; @@ -148,7 +148,7 @@ private void gatherStats(Object row) { (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE); for (Object o : writable) { - // It's possible that a parition column may have NULL value, in which case the row belongs + // It's possible that a partition column may have NULL value, in which case the row belongs // to the special partition, __HIVE_DEFAULT_PARTITION__. values.add(o == null ? defaultPartitionName : o.toString()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index 94ae932..08aa2af 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -217,6 +217,7 @@ public void close() { throw new RuntimeException("Hive Runtime Error while closing operators", e); } } finally { + execContext.clear(); MapredContext.close(); Utilities.clearWorkMap(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java index 8b92f32..6099e02 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.exec.mr; +import java.util.Arrays; +import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; @@ -25,9 +27,12 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.serde2.RecordIdentifier; +import org.apache.hadoop.hive.serde2.VirtualColumn; +import org.apache.hadoop.hive.serde2.VirtualColumnProvider; import org.apache.hadoop.mapred.JobConf; -public class ExecMapperContext { +public class ExecMapperContext implements VirtualColumnProvider { public static final Log l4j = ExecMapper.l4j; @@ -72,7 +77,7 @@ public void clear() { } /** - * For CompbineFileInputFormat, the mapper's input file will be changed on the + * For CombineFileInputFormat, the mapper's input file will be changed on the * fly, and the input file name is passed to jobConf by shims/initNextRecordReader. * If the map local work has any mapping depending on the current * mapper's input file, the work need to clear context and re-initialization @@ -155,4 +160,37 @@ public IOContext getIoCxt() { public void setIoCxt(IOContext ioCxt) { this.ioCxt = ioCxt; } + + @Override + public List getVirtualColumns() { + return Arrays.asList( + VirtualColumn.FILENAME, VirtualColumn.BLOCKOFFSET, VirtualColumn.ROWOFFSET, + VirtualColumn.ROWID); + } + + private transient Object[] rowID; + + @Override + public Object evaluate(VirtualColumn vc) { + if (vc.equals(VirtualColumn.FILENAME)) { + return getCurrentInputPath().toString(); + } + if (vc.equals(VirtualColumn.BLOCKOFFSET)) { + return ioCxt.getCurrentBlockStart(); + } + if (vc.equals(VirtualColumn.ROWOFFSET)) { + return ioCxt.getCurrentRow(); + } + if (ioCxt.ri != null && vc.equals(VirtualColumn.ROWID)) { + if (rowID == null) { + rowID = new Object[RecordIdentifier.Field.values().length]; + } + RecordIdentifier.StructInfo.toArray(ioCxt.ri, rowID); + ioCxt.ri = null;//so we don't accidentally cache the value; shouldn't + //happen since IO layer either knows how to produce ROW__ID or not - but to be safe + return rowID; + } + return null; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java index 5ce7553..4f50a77 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.sql.Timestamp; +import java.util.Collections; import java.util.List; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -27,6 +28,7 @@ import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.VirtualColumn; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; import org.apache.hadoop.hive.serde2.io.DateWritable; @@ -274,4 +276,9 @@ public void deserializeVector(Object rowBlob, int rowsInBlob, } } } + + @Override + public List getVirtualColumns() { + return Collections.emptyList(); // do not provide + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java index e67996d..268604b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java @@ -34,9 +34,9 @@ import org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.IndexUtils; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.serde2.VirtualColumn; /** diff --git ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java index b076933..0ede02d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java @@ -28,7 +28,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Index; @@ -46,7 +45,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.IndexUtils; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -56,6 +54,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.serde2.VirtualColumn; /** * Index handler for the bitmap index. Bitmap index uses an EWAH-compressed diff --git ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java index 0ca5d22..cb8b7e1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java @@ -28,7 +28,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -50,7 +49,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler.DecomposedPredicate; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.IndexUtils; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -65,6 +63,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.serde2.VirtualColumn; public class CompactIndexHandler extends TableBasedIndexHandler { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java index e1d2395..8b7f2b8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.serde2.RecordIdentifier; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java index 96d7b1e..21c96ff 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.serde2.RecordIdentifier; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -169,7 +170,7 @@ private void initIOContext(long startPos, boolean isBlockPointer, ioCxtRef = this.getIOContext(); ioCxtRef.currentBlockStart = startPos; ioCxtRef.isBlockPointer = isBlockPointer; - ioCxtRef.inputPath = inputPath; + ioCxtRef.setInputPath(inputPath); LOG.info("Processing file " + inputPath); initDone = true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java index d42f568..1446a3a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.RecordIdentifier; /** @@ -75,7 +76,7 @@ public static void clear() { // The class name of the generic UDF being used by the filter String genericUDFClassName = null; /** - * supports {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#ROWID} + * supports {@link org.apache.hadoop.hive.serde2.VirtualColumn#ROWID} */ public RecordIdentifier ri; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java deleted file mode 100644 index cdde3dc..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.io; - -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.io.WritableComparable; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -/** - * Gives the Record identifier information for the current record. - */ -public class RecordIdentifier implements WritableComparable { - /** - * This is in support of {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#ROWID} - * Contains metadata about each field in RecordIdentifier that needs to be part of ROWID - * which is represented as a struct {@link org.apache.hadoop.hive.ql.io.RecordIdentifier.StructInfo}. - * Each field of RecordIdentifier which should be part of ROWID should be in this enum... which - * really means that it should be part of VirtualColumn (so make a subclass for rowid). - */ - public static enum Field { - //note the enum names match field names in the struct - transactionId(TypeInfoFactory.longTypeInfo, - PrimitiveObjectInspectorFactory.javaLongObjectInspector), - bucketId(TypeInfoFactory.intTypeInfo, PrimitiveObjectInspectorFactory.javaIntObjectInspector), - rowId(TypeInfoFactory.longTypeInfo, PrimitiveObjectInspectorFactory.javaLongObjectInspector); - public final TypeInfo fieldType; - public final ObjectInspector fieldOI; - Field(TypeInfo fieldType, ObjectInspector fieldOI) { - this.fieldType = fieldType; - this.fieldOI = fieldOI; - } - } - /** - * RecordIdentifier is passed along the operator tree as a struct. This class contains a few - * utilities for that. - */ - public static final class StructInfo { - private static final List fieldNames = new ArrayList(Field.values().length); - private static final List fieldTypes = new ArrayList(fieldNames.size()); - private static final List fieldOis = - new ArrayList(fieldNames.size()); - static { - for(Field f : Field.values()) { - fieldNames.add(f.name()); - fieldTypes.add(f.fieldType); - fieldOis.add(f.fieldOI); - } - } - public static final TypeInfo typeInfo = - TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypes); - public static final ObjectInspector oi = - ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOis); - - /** - * Copies relevant fields from {@code ri} to {@code struct} - * @param ri - * @param struct must be of size Field.values().size() - */ - public static void toArray(RecordIdentifier ri, Object[] struct) { - assert struct != null && struct.length == Field.values().length; - if(ri == null) { - Arrays.fill(struct, null); - return; - } - struct[Field.transactionId.ordinal()] = ri.getTransactionId(); - struct[Field.bucketId.ordinal()] = ri.getBucketId(); - struct[Field.rowId.ordinal()] = ri.getRowId(); - } - } - - private long transactionId; - private int bucketId; - private long rowId; - - public RecordIdentifier() { - } - - public RecordIdentifier(long transactionId, int bucket, long rowId) { - this.transactionId = transactionId; - this.bucketId = bucket; - this.rowId = rowId; - } - - /** - * Set the identifier. - * @param transactionId the transaction id - * @param bucketId the bucket id - * @param rowId the row id - */ - public void setValues(long transactionId, int bucketId, long rowId) { - this.transactionId = transactionId; - this.bucketId = bucketId; - this.rowId = rowId; - } - - /** - * Set this object to match the given object. - * @param other the object to copy from - */ - public void set(RecordIdentifier other) { - this.transactionId = other.transactionId; - this.bucketId = other.bucketId; - this.rowId = other.rowId; - } - - public void setRowId(long rowId) { - this.rowId = rowId; - } - - /** - * What was the original transaction id for the last row? - * @return the transaction id - */ - public long getTransactionId() { - return transactionId; - } - - /** - * What was the original bucket id for the last row? - * @return the bucket id - */ - public int getBucketId() { - return bucketId; - } - - /** - * What was the original row id for the last row? - * @return the row id - */ - public long getRowId() { - return rowId; - } - - protected int compareToInternal(RecordIdentifier other) { - if (other == null) { - return -1; - } - if (transactionId != other.transactionId) { - return transactionId < other.transactionId ? -1 : 1; - } - if (bucketId != other.bucketId) { - return bucketId < other.bucketId ? - 1 : 1; - } - if (rowId != other.rowId) { - return rowId < other.rowId ? -1 : 1; - } - return 0; - } - - @Override - public int compareTo(RecordIdentifier other) { - if (other.getClass() != RecordIdentifier.class) { - return -other.compareTo(this); - } - return compareToInternal(other); - } - - @Override - public void write(DataOutput dataOutput) throws IOException { - dataOutput.writeLong(transactionId); - dataOutput.writeInt(bucketId); - dataOutput.writeLong(rowId); - } - - @Override - public void readFields(DataInput dataInput) throws IOException { - transactionId = dataInput.readLong(); - bucketId = dataInput.readInt(); - rowId = dataInput.readLong(); - } - - @Override - public boolean equals(Object other) { - if (other == null || other.getClass() != getClass()) { - return false; - } - RecordIdentifier oth = (RecordIdentifier) other; - return oth.transactionId == transactionId && - oth.bucketId == bucketId && - oth.rowId == rowId; - } - - @Override - public String toString() { - return "{originalTxn: " + transactionId + ", bucket: " + - bucketId + ", row: " + getRowId() + "}"; - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 5be2b4f..de3ae81 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -50,7 +50,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.InputFormatChecker; -import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.serde2.RecordIdentifier; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 144f21e..771bacb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -27,8 +27,8 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.RecordIdentifier; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.serde2.RecordIdentifier; +import org.apache.hadoop.hive.ql.metadata.VirtualColumns; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; @@ -44,6 +44,7 @@ import java.util.Deque; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; /** @@ -631,13 +632,15 @@ public ObjectInspector getObjectInspector() { // NOTE: if "columns.types" is missing, all columns will be of String type String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES); + Set vcs = VirtualColumns.toNames(VirtualColumns.getRegistry(conf, null)); + // Parse the configuration parameters ArrayList columnNames = new ArrayList(); Deque virtualColumns = new ArrayDeque(); if (columnNameProperty != null && columnNameProperty.length() > 0) { String[] colNames = columnNameProperty.split(","); for (int i = 0; i < colNames.length; i++) { - if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(colNames[i])) { + if (vcs.contains(colNames[i])) { virtualColumns.addLast(i); } else { columnNames.add(colNames[i]); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 44bc391..a6c41f1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.serde2.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java index a8e5c2e..2bfb0c4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidInputFormat; -import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.serde2.RecordIdentifier; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index 3f8e4d7..10605e1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.parquet.convert.DataWritableRecordConverter; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.metadata.VirtualColumns; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.util.StringUtils; @@ -57,7 +57,7 @@ * @return list with virtual columns removed */ private static List getColumns(final String columns) { - return (List) VirtualColumn. + return (List) VirtualColumns. removeVirtualColumns(StringUtils.getStringCollection(columns)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java index 9caa4ed..4a8aa4a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeSpec; import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.VirtualColumn; +import org.apache.hadoop.hive.serde2.VirtualColumnProvider; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; @@ -76,7 +78,7 @@ */ @SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES, ParquetOutputFormat.COMPRESSION}) -public class ParquetHiveSerDe extends AbstractSerDe { +public class ParquetHiveSerDe extends AbstractSerDe implements VirtualColumnProvider { public static final Text MAP_KEY = new Text("key"); public static final Text MAP_VALUE = new Text("value"); public static final Text MAP = new Text("map"); @@ -335,4 +337,16 @@ public SerDeStats getSerDeStats() { } return stats; } + + @Override + public List getVirtualColumns() { + return Arrays.asList(VirtualColumn.RAWDATASIZE); + } + + @Override + public Object evaluate(VirtualColumn vc) { + assert vc == VirtualColumn.RAWDATASIZE; + assert (status != LAST_OPERATION.UNKNOWN); + return status == LAST_OPERATION.SERIALIZE ? serializedSize : deserializedSize; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java deleted file mode 100644 index ecc5d92..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.metadata; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.ListIterator; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.classification.InterfaceAudience; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.io.RecordIdentifier; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; - -@InterfaceAudience.Private -public class VirtualColumn implements Serializable { - - private static final long serialVersionUID = 1L; - - public static final VirtualColumn FILENAME = new VirtualColumn("INPUT__FILE__NAME", (PrimitiveTypeInfo)TypeInfoFactory.stringTypeInfo); - public static final VirtualColumn BLOCKOFFSET = new VirtualColumn("BLOCK__OFFSET__INSIDE__FILE", (PrimitiveTypeInfo)TypeInfoFactory.longTypeInfo); - public static final VirtualColumn ROWOFFSET = new VirtualColumn("ROW__OFFSET__INSIDE__BLOCK", (PrimitiveTypeInfo)TypeInfoFactory.longTypeInfo); - - public static final VirtualColumn RAWDATASIZE = new VirtualColumn("RAW__DATA__SIZE", (PrimitiveTypeInfo)TypeInfoFactory.longTypeInfo); - /** - * {@link org.apache.hadoop.hive.ql.io.RecordIdentifier} - */ - public static final VirtualColumn ROWID = new VirtualColumn("ROW__ID", RecordIdentifier.StructInfo.typeInfo, true, RecordIdentifier.StructInfo.oi); - - /** - * GROUPINGID is used with GROUP BY GROUPINGS SETS, ROLLUP and CUBE. - * It composes a bit vector with the "0" and "1" values for every - * column which is GROUP BY section. "1" is for a row in the result - * set if that column has been aggregated in that row. Otherwise the - * value is "0". Returns the decimal representation of the bit vector. - */ - public static final VirtualColumn GROUPINGID = - new VirtualColumn("GROUPING__ID", (PrimitiveTypeInfo) TypeInfoFactory.intTypeInfo); - - public static ImmutableSet VIRTUAL_COLUMN_NAMES = - ImmutableSet.of(FILENAME.getName(), BLOCKOFFSET.getName(), ROWOFFSET.getName(), - RAWDATASIZE.getName(), GROUPINGID.getName(), ROWID.getName()); - - private final String name; - private final TypeInfo typeInfo; - private final boolean isHidden; - private final ObjectInspector oi; - - private VirtualColumn(String name, PrimitiveTypeInfo typeInfo) { - this(name, typeInfo, true, - PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(typeInfo)); - } - - private VirtualColumn(String name, TypeInfo typeInfo, boolean isHidden, ObjectInspector oi) { - this.name = name; - this.typeInfo = typeInfo; - this.isHidden = isHidden; - this.oi = oi; - } - - public static List getStatsRegistry(Configuration conf) { - List l = new ArrayList(); - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_RAWDATASIZE)) { - l.add(RAWDATASIZE); - } - return l; - } - - public static List getRegistry(Configuration conf) { - ArrayList l = new ArrayList(); - l.add(BLOCKOFFSET); - l.add(FILENAME); - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEROWOFFSET)) { - l.add(ROWOFFSET); - } - l.add(ROWID); - - return l; - } - - public TypeInfo getTypeInfo() { - return typeInfo; - } - - public String getName() { - return this.name; - } - - public boolean isHidden() { - return isHidden; - } - - public boolean getIsHidden() { - return isHidden; - } - - public ObjectInspector getObjectInspector() { - return oi; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if(!(o instanceof VirtualColumn)) { - return false; - } - VirtualColumn c = (VirtualColumn) o; - return this.name.equals(c.name) - && this.typeInfo.getTypeName().equals(c.getTypeInfo().getTypeName()); - } - @Override - public int hashCode() { - int c = 19; - c = 31 * name.hashCode() + c; - return 31 * typeInfo.getTypeName().hashCode() + c; - } - public static Collection removeVirtualColumns(final Collection columns) { - Iterables.removeAll(columns, VIRTUAL_COLUMN_NAMES); - return columns; - } - - public static List removeVirtualColumnTypes(final List columnNames, - final List columnTypes) { - if (columnNames.size() != columnTypes.size()) { - throw new IllegalArgumentException("Number of column names in configuration " + - columnNames.size() + " differs from column types " + columnTypes.size()); - } - - int i = 0; - ListIterator it = columnTypes.listIterator(); - while(it.hasNext()) { - it.next(); - if (VIRTUAL_COLUMN_NAMES.contains(columnNames.get(i))) { - it.remove(); - } - ++i; - } - return columnTypes; - } - - public static StructObjectInspector getVCSObjectInspector(List vcs) { - List names = new ArrayList(vcs.size()); - List inspectors = new ArrayList(vcs.size()); - for (VirtualColumn vc : vcs) { - names.add(vc.getName()); - inspectors.add(vc.oi); - } - return ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors); - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumns.java ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumns.java new file mode 100644 index 0000000..75041bd --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumns.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.metadata; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.VirtualColumn; +import org.apache.hadoop.hive.serde2.VirtualColumnProvider; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +public class VirtualColumns { + + private static final Log LOG = LogFactory.getLog(VirtualColumns.class.getName()); + + private static final Function FUNC_VC_NAME = + new Function() { + @Override + public String apply(VirtualColumn input) { + return input.getName(); + } + }; + + private static final Predicate PREDICATE_STATS_ONLY = + new Predicate() { + @Override + public boolean apply(VirtualColumn input) { + return input.isStats(); + } + }; + + public static final ImmutableSet VIRTUAL_COLUMNS = + ImmutableSet.of(VirtualColumn.FILENAME, VirtualColumn.BLOCKOFFSET, VirtualColumn.ROWOFFSET, + VirtualColumn.RAWDATASIZE, VirtualColumn.GROUPINGID, VirtualColumn.ROWID); + + public static final ImmutableSet RUNTIME_VCS = + ImmutableSet.of(VirtualColumn.FILENAME, VirtualColumn.BLOCKOFFSET, VirtualColumn.ROWOFFSET, + VirtualColumn.GROUPINGID, VirtualColumn.ROWID); + + public static final ImmutableSet VIRTUAL_COLUMN_NAMES = toNames(VIRTUAL_COLUMNS); + + public static List getStatsRegistry(Configuration conf, Deserializer deserializer) { + if (!(deserializer instanceof VirtualColumnProvider)) { + return Collections.emptyList(); + } + List vcs = getCustomVCs(deserializer); + if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_RAWDATASIZE)) { + vcs.remove(VirtualColumn.RAWDATASIZE); + } + return ImmutableList.copyOf(Iterables.filter(vcs, PREDICATE_STATS_ONLY)); + } + + public static List getRegistry(Configuration conf, Deserializer deserializer) { + Set nonStats = new LinkedHashSet(); + nonStats.add(VirtualColumn.BLOCKOFFSET); + nonStats.add(VirtualColumn.FILENAME); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEROWOFFSET)) { + nonStats.add(VirtualColumn.ROWOFFSET); + } + nonStats.add(VirtualColumn.ROWID); + nonStats.addAll(getCustomVCs(deserializer)); + return ImmutableList.copyOf(Iterables.filter(nonStats, Predicates.not(PREDICATE_STATS_ONLY))); + } + + public static ImmutableSet toNames(Collection registry) { + return ImmutableSet.copyOf(Iterables.transform(registry, FUNC_VC_NAME)); + } + + public static Collection removeVirtualColumns(Collection columns) { + Iterables.removeAll(columns, VIRTUAL_COLUMN_NAMES); + return columns; + } + + public static StructObjectInspector getVCSObjectInspector(List vcs) { + List names = new ArrayList(vcs.size()); + List inspectors = new ArrayList(vcs.size()); + for (VirtualColumn vc : vcs) { + names.add(vc.getName()); + inspectors.add(vc.getObjectInspector()); + } + return ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors); + } + + private static List getCustomVCs(Deserializer deserializer) { + if (!(deserializer instanceof VirtualColumnProvider)) { + return new ArrayList(); + } + List vcs = ((VirtualColumnProvider) deserializer).getVirtualColumns(); + if (vcs == null || vcs.isEmpty()) { + return new ArrayList(); + } + // to mutable + vcs = new ArrayList(vcs); + if (vcs.removeAll(RUNTIME_VCS)) { + LOG.warn(deserializer + " tried to override some runtime VCs, but ignored."); + } + return vcs; + } + + public static class Builder { + + private final VirtualColumn[] vc; + private final VirtualColumnProvider[] vcp; + + private final Object[] vcValues; + + private Builder(VirtualColumn[] vc, VirtualColumnProvider[] vcp) { + this.vc = vc; + this.vcp = vcp; + this.vcValues = new Object[vc.length]; + } + + public Object[] evaluate() { + for (int i = 0 ; i < vcp.length; i++) { + vcValues[i] = vcp[i].evaluate(vc[i]); + } + return vcValues; + } + } + + public static Builder builder(List vcs, + VirtualColumnProvider defaultProvider, Deserializer deserializer) { + if (defaultProvider == null) { + defaultProvider = VirtualColumnProvider.NULL; + } + VirtualColumn[] vc = new VirtualColumn[vcs.size()]; + VirtualColumnProvider[] vcp = new VirtualColumnProvider[vcs.size()]; + + Set columnNames = toNames(getCustomVCs(deserializer)); + for (int i = 0 ; i < vcs.size(); i++) { + vc[i] = vcs.get(i); + vcp[i] = columnNames.contains(vc[i].getName()) ? + (VirtualColumnProvider) deserializer : defaultProvider; + } + return new Builder(vc, vcp); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java index afd1738..f1f7d17 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -62,6 +61,7 @@ import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFunctionDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowTableFunctionDef; +import org.apache.hadoop.hive.serde2.VirtualColumn; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java index 2f517f2..0e193ae 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -52,7 +51,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.lib.Utils; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.metadata.VirtualColumns; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -61,6 +60,7 @@ import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.serde2.VirtualColumn; /** * Operator factory for the rule processors for lineage. @@ -150,9 +150,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, fieldSchemaMap.put(col.getName(), col); } - Iterator vcs = VirtualColumn.getRegistry(pctx.getConf()).iterator(); - while (vcs.hasNext()) { - VirtualColumn vc = vcs.next(); + for (VirtualColumn vc : VirtualColumns.getRegistry(pctx.getConf(), t.getDeserializer())) { fieldSchemaMap.put(vc.getName(), new FieldSchema(vc.getName(), vc.getTypeInfo().getTypeName(), "")); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcCtx.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcCtx.java index db99786..fe4a159 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcCtx.java @@ -22,7 +22,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.serde2.VirtualColumn; /** * The processor context for partition condition remover. This contains diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java index cbd4e6c..a7a70fe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.ppr.PartExprEvalUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -48,6 +47,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.VirtualColumn; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index c8e6ef5..2ecb660 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -59,7 +59,7 @@ import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.metadata.VirtualColumns; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc; import org.apache.hadoop.hive.ql.plan.AggregationDesc; @@ -1152,7 +1152,7 @@ private boolean validateExprNodeDescRecursive(ExprNodeDesc desc) { if (desc instanceof ExprNodeColumnDesc) { ExprNodeColumnDesc c = (ExprNodeColumnDesc) desc; // Currently, we do not support vectorized virtual columns (see HIVE-5570). - if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(c.getColumn())) { + if (isVirtualColumn(c.getColumn())) { LOG.info("Cannot vectorize virtual column " + c.getColumn()); return false; } @@ -1279,7 +1279,7 @@ private VectorizationContext getVectorizationContext(Operator op, VectorizationContext vContext = new VectorizationContext(); for (ColumnInfo c : rs.getSignature()) { // Earlier, validation code should have eliminated virtual columns usage (HIVE-5560). - if (!isVirtualColumn(c)) { + if (!isVirtualColumn(c.getInternalName())) { vContext.addInitialColumn(c.getInternalName()); } } @@ -1331,14 +1331,10 @@ private void fixupParentChildOperators(Operator op, return vectorOp; } - private boolean isVirtualColumn(ColumnInfo column) { - + private boolean isVirtualColumn(String columnName) { // Not using method column.getIsVirtualCol() because partitioning columns are also // treated as virtual columns in ColumnInfo. - if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(column.getInternalName())) { - return true; - } - return false; + return VirtualColumns.VIRTUAL_COLUMN_NAMES.contains(columnName); } public void debugDisplayAllMaps(Map> allColumnVectorMaps, diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java index dc5d2df..29f0269 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java @@ -30,9 +30,10 @@ import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.metadata.VirtualColumns; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.serde2.VirtualColumn; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; @@ -88,7 +89,7 @@ static synchronized public Object evalExprWithPart(ExprNodeDesc expr, ois.add(rowObjectInspector); ois.add(partObjectInspector); if (hasVC) { - ois.add(VirtualColumn.getVCSObjectInspector(vcs)); + ois.add(VirtualColumns.getVCSObjectInspector(vcs)); } StructObjectInspector rowWithPartObjectInspector = ObjectInspectorFactory .getUnionStructObjectInspector(ois); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 5d72e15..e4c869d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -105,7 +105,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.metadata.VirtualColumns; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveDefaultRelMetadataProvider; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveTypeSystemImpl; @@ -149,6 +149,8 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.VirtualColumn; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @@ -1170,7 +1172,8 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc // 3.1 Add Column info for non partion cols (Object Inspector fields) @SuppressWarnings("deprecation") - StructObjectInspector rowObjectInspector = (StructObjectInspector) tab.getDeserializer() + Deserializer deserializer = tab.getDeserializer(); + StructObjectInspector rowObjectInspector = (StructObjectInspector) deserializer .getObjectInspector(); List fields = rowObjectInspector.getAllStructFieldRefs(); ColumnInfo colInfo; @@ -1202,9 +1205,7 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc } // 3.3 Add column info corresponding to virtual columns - Iterator vcs = VirtualColumn.getRegistry(conf).iterator(); - while (vcs.hasNext()) { - VirtualColumn vc = vcs.next(); + for (VirtualColumn vc : VirtualColumns.getRegistry(conf, deserializer)) { colInfo = new ColumnInfo(vc.getName(), vc.getTypeInfo(), tableAlias, true, vc.getIsHidden()); rr.put(tableAlias, vc.getName(), colInfo); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessInfo.java ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessInfo.java index 3d7206b..e9691c6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessInfo.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnAccessInfo.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hive.ql.parse; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.serde2.VirtualColumn; import java.util.ArrayList; import java.util.Collections; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index c2d5c8c..2e03afd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -107,7 +107,7 @@ import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.metadata.VirtualColumns; import org.apache.hadoop.hive.ql.optimizer.Optimizer; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; @@ -188,6 +188,7 @@ import org.apache.hadoop.hive.serde2.NullStructSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.VirtualColumn; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -9260,12 +9261,9 @@ private Operator genTablePlan(String alias, QB qb) throws SemanticException { TypeInfoFactory.getPrimitiveTypeInfo(part_col.getType()), alias, true)); } - // put all virtual columns in RowResolver. - Iterator vcs = VirtualColumn.getRegistry(conf).iterator(); - // use a list for easy cumtomize + // put all virtual columns in RowResolver. use a list for easy customize List vcList = new ArrayList(); - while (vcs.hasNext()) { - VirtualColumn vc = vcs.next(); + for (VirtualColumn vc : VirtualColumns.getRegistry(conf, tab.getDeserializer())) { rwsch.put(alias, vc.getName(), new ColumnInfo(vc.getName(), vc.getTypeInfo(), alias, true, vc.getIsHidden())); vcList.add(vc); @@ -9465,10 +9463,8 @@ private void setupStats(TableScanDesc tsDesc, QBParseInfo qbp, Table tab, String tsDesc.setMaxStatsKeyPrefixLength(StatsFactory.getMaxPrefixLength(conf)); // append additional virtual columns for storing statistics - Iterator vcs = VirtualColumn.getStatsRegistry(conf).iterator(); List vcList = new ArrayList(); - while (vcs.hasNext()) { - VirtualColumn vc = vcs.next(); + for (VirtualColumn vc : VirtualColumns.getStatsRegistry(conf, tab.getDeserializer())) { rwsch.put(alias, vc.getName(), new ColumnInfo(vc.getName(), vc.getTypeInfo(), alias, true, vc.getIsHidden())); vcList.add(vc); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index 4eedb14..76d62cd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -37,8 +37,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.serde2.VirtualColumn; /** diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 699b476..670b4a5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -24,7 +24,7 @@ import java.util.Map; import org.apache.hadoop.hive.ql.exec.PTFUtils; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.serde2.VirtualColumn; /** * Table Scan Descriptor Currently, data is only read from a base source as part diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 3cdbc6c..727d417 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.serde2.RecordIdentifier; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.shims.ShimLoader; diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java index b669754..102f759 100755 --- ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.CastDecimalToLong; import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.CastDoubleToLong; import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.CastTimestampToLongViaLongToLong; -import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.serde2.RecordIdentifier; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index 7bb2742..9c47764 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.serde2.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; @@ -51,7 +51,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestRecordIdentifier.java ql/src/test/org/apache/hadoop/hive/ql/io/TestRecordIdentifier.java index 6d83f70..04b0136 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestRecordIdentifier.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestRecordIdentifier.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.io; +import org.apache.hadoop.hive.serde2.RecordIdentifier; import org.junit.Test; import static org.junit.Assert.assertTrue; diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 595e003..c44f6f7 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hive.common.ValidTxnListImpl; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.serde2.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair; import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderKey; diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java index 22bd4b9..18db7c7 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.serde2.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index d68e431..17daf28 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.serde2.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; diff --git serde/src/java/org/apache/hadoop/hive/serde2/AbstractDeserializer.java serde/src/java/org/apache/hadoop/hive/serde2/AbstractDeserializer.java index 869b86b..7f8c22a 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/AbstractDeserializer.java +++ serde/src/java/org/apache/hadoop/hive/serde2/AbstractDeserializer.java @@ -64,5 +64,7 @@ /** * Returns statistics collected when serializing */ - public abstract SerDeStats getSerDeStats(); + public SerDeStats getSerDeStats() { + return null; + } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java index c5e78c5..77b351f 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerDe.java @@ -86,7 +86,10 @@ public abstract Writable serialize(Object obj, ObjectInspector objInspector) /** * Returns statistics collected when serializing */ - public abstract SerDeStats getSerDeStats(); + @Deprecated + public SerDeStats getSerDeStats() { + return null; + } /** * Deserialize an object out of a Writable blob. In most cases, the return diff --git serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerializer.java serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerializer.java index 570b4bb..904c559 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerializer.java +++ serde/src/java/org/apache/hadoop/hive/serde2/AbstractSerializer.java @@ -61,5 +61,7 @@ public abstract Writable serialize(Object obj, ObjectInspector objInspector) /** * Returns statistics collected when serializing */ - public abstract SerDeStats getSerDeStats(); + public SerDeStats getSerDeStats() { + return null; + } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/MetadataTypedColumnsetSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/MetadataTypedColumnsetSerDe.java index 262c57f..377a43e 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/MetadataTypedColumnsetSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/MetadataTypedColumnsetSerDe.java @@ -227,11 +227,4 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDe serializeCache.set(sb.toString()); return serializeCache; } - - @Override - public SerDeStats getSerDeStats() { - // no support for statistics - return null; - } - } diff --git serde/src/java/org/apache/hadoop/hive/serde2/NullStructSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/NullStructSerDe.java index 83f34ce..f49483d 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/NullStructSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/NullStructSerDe.java @@ -69,11 +69,6 @@ public ObjectInspector getObjectInspector() throws SerDeException { } @Override - public SerDeStats getSerDeStats() { - return null; - } - - @Override public void initialize(Configuration conf, Properties tbl) throws SerDeException { } diff --git serde/src/java/org/apache/hadoop/hive/serde2/OpenCSVSerde.java serde/src/java/org/apache/hadoop/hive/serde2/OpenCSVSerde.java index 44b5ae7..44e40d9 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/OpenCSVSerde.java +++ serde/src/java/org/apache/hadoop/hive/serde2/OpenCSVSerde.java @@ -200,9 +200,4 @@ public ObjectInspector getObjectInspector() throws SerDeException { public Class getSerializedClass() { return Text.class; } - - @Override - public SerDeStats getSerDeStats() { - return null; - } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/RecordIdentifier.java serde/src/java/org/apache/hadoop/hive/serde2/RecordIdentifier.java new file mode 100644 index 0000000..4875cd6 --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/RecordIdentifier.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.WritableComparable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Gives the Record identifier information for the current record. + */ +public class RecordIdentifier implements WritableComparable { + /** + * This is in support of {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#ROWID} + * Contains metadata about each field in RecordIdentifier that needs to be part of ROWID + * which is represented as a struct {@link RecordIdentifier.StructInfo}. + * Each field of RecordIdentifier which should be part of ROWID should be in this enum... which + * really means that it should be part of VirtualColumn (so make a subclass for rowid). + */ + public static enum Field { + //note the enum names match field names in the struct + transactionId(TypeInfoFactory.longTypeInfo, + PrimitiveObjectInspectorFactory.javaLongObjectInspector), + bucketId(TypeInfoFactory.intTypeInfo, PrimitiveObjectInspectorFactory.javaIntObjectInspector), + rowId(TypeInfoFactory.longTypeInfo, PrimitiveObjectInspectorFactory.javaLongObjectInspector); + public final TypeInfo fieldType; + public final ObjectInspector fieldOI; + Field(TypeInfo fieldType, ObjectInspector fieldOI) { + this.fieldType = fieldType; + this.fieldOI = fieldOI; + } + } + /** + * RecordIdentifier is passed along the operator tree as a struct. This class contains a few + * utilities for that. + */ + public static final class StructInfo { + private static final List fieldNames = new ArrayList(Field.values().length); + private static final List fieldTypes = new ArrayList(fieldNames.size()); + private static final List fieldOis = + new ArrayList(fieldNames.size()); + static { + for(Field f : Field.values()) { + fieldNames.add(f.name()); + fieldTypes.add(f.fieldType); + fieldOis.add(f.fieldOI); + } + } + public static final TypeInfo typeInfo = + TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypes); + public static final ObjectInspector oi = + ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOis); + + /** + * Copies relevant fields from {@code ri} to {@code struct} + * @param ri + * @param struct must be of size Field.values().size() + */ + public static void toArray(RecordIdentifier ri, Object[] struct) { + assert struct != null && struct.length == Field.values().length; + if(ri == null) { + Arrays.fill(struct, null); + return; + } + struct[Field.transactionId.ordinal()] = ri.getTransactionId(); + struct[Field.bucketId.ordinal()] = ri.getBucketId(); + struct[Field.rowId.ordinal()] = ri.getRowId(); + } + } + + private long transactionId; + private int bucketId; + private long rowId; + + public RecordIdentifier() { + } + + public RecordIdentifier(long transactionId, int bucket, long rowId) { + this.transactionId = transactionId; + this.bucketId = bucket; + this.rowId = rowId; + } + + /** + * Set the identifier. + * @param transactionId the transaction id + * @param bucketId the bucket id + * @param rowId the row id + */ + public void setValues(long transactionId, int bucketId, long rowId) { + this.transactionId = transactionId; + this.bucketId = bucketId; + this.rowId = rowId; + } + + /** + * Set this object to match the given object. + * @param other the object to copy from + */ + public void set(RecordIdentifier other) { + this.transactionId = other.transactionId; + this.bucketId = other.bucketId; + this.rowId = other.rowId; + } + + public void setRowId(long rowId) { + this.rowId = rowId; + } + + /** + * What was the original transaction id for the last row? + * @return the transaction id + */ + public long getTransactionId() { + return transactionId; + } + + /** + * What was the original bucket id for the last row? + * @return the bucket id + */ + public int getBucketId() { + return bucketId; + } + + /** + * What was the original row id for the last row? + * @return the row id + */ + public long getRowId() { + return rowId; + } + + protected int compareToInternal(RecordIdentifier other) { + if (other == null) { + return -1; + } + if (transactionId != other.transactionId) { + return transactionId < other.transactionId ? -1 : 1; + } + if (bucketId != other.bucketId) { + return bucketId < other.bucketId ? - 1 : 1; + } + if (rowId != other.rowId) { + return rowId < other.rowId ? -1 : 1; + } + return 0; + } + + @Override + public int compareTo(RecordIdentifier other) { + if (other.getClass() != RecordIdentifier.class) { + return -other.compareTo(this); + } + return compareToInternal(other); + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeLong(transactionId); + dataOutput.writeInt(bucketId); + dataOutput.writeLong(rowId); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + transactionId = dataInput.readLong(); + bucketId = dataInput.readInt(); + rowId = dataInput.readLong(); + } + + @Override + public boolean equals(Object other) { + if (other == null || other.getClass() != getClass()) { + return false; + } + RecordIdentifier oth = (RecordIdentifier) other; + return oth.transactionId == transactionId && + oth.bucketId == bucketId && + oth.rowId == rowId; + } + + @Override + public String toString() { + return "{originalTxn: " + transactionId + ", bucket: " + + bucketId + ", row: " + getRowId() + "}"; + } +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/RegexSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/RegexSerDe.java index 38e8b77..541c9f9 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/RegexSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/RegexSerDe.java @@ -289,10 +289,4 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throw new UnsupportedOperationException( "Regex SerDe doesn't support the serialize() method"); } - - @Override - public SerDeStats getSerDeStats() { - // no support for statistics - return null; - } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/TypedSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/TypedSerDe.java index f868b22..91b84da 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/TypedSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/TypedSerDe.java @@ -86,11 +86,4 @@ public void initialize(Configuration job, Properties tbl) throws SerDeException public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { throw new RuntimeException("not supported"); } - - @Override - public SerDeStats getSerDeStats() { - // no support for statistics - return null; - } - } diff --git serde/src/java/org/apache/hadoop/hive/serde2/VirtualColumn.java serde/src/java/org/apache/hadoop/hive/serde2/VirtualColumn.java new file mode 100644 index 0000000..e82af14 --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/VirtualColumn.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2; + +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +import java.io.Serializable; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class VirtualColumn implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final VirtualColumn FILENAME = + new VirtualColumn("INPUT__FILE__NAME", TypeInfoFactory.stringTypeInfo, false); + public static final VirtualColumn BLOCKOFFSET = + new VirtualColumn("BLOCK__OFFSET__INSIDE__FILE", TypeInfoFactory.longTypeInfo, false); + public static final VirtualColumn ROWOFFSET = + new VirtualColumn("ROW__OFFSET__INSIDE__BLOCK", TypeInfoFactory.longTypeInfo, false); + public static final VirtualColumn RAWDATASIZE = + new VirtualColumn("RAW__DATA__SIZE", TypeInfoFactory.longTypeInfo, true); + + /** + * GROUPINGID is used with GROUP BY GROUPINGS SETS, ROLLUP and CUBE. + * It composes a bit vector with the "0" and "1" values for every + * column which is GROUP BY section. "1" is for a row in the result + * set if that column has been aggregated in that row. Otherwise the + * value is "0". Returns the decimal representation of the bit vector. + */ + public static final VirtualColumn GROUPINGID = + new VirtualColumn("GROUPING__ID", TypeInfoFactory.intTypeInfo, false); + + /** + * {@link RecordIdentifier} + */ + public static final VirtualColumn ROWID = new VirtualColumn("ROW__ID", + RecordIdentifier.StructInfo.typeInfo, true, false, RecordIdentifier.StructInfo.oi); + + private final String name; + private final TypeInfo typeInfo; + private final boolean isHidden; + private final boolean isStats; + private final ObjectInspector oi; + + public VirtualColumn(String name, PrimitiveTypeInfo typeInfo, boolean isStats) { + this(name, typeInfo, true, isStats, + PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(typeInfo)); + } + + public VirtualColumn(String name, TypeInfo typeInfo, + boolean isHidden, boolean isStats, ObjectInspector oi) { + this.name = name; + this.typeInfo = typeInfo; + this.isHidden = isHidden; + this.isStats = isStats; + this.oi = oi; + } + + public TypeInfo getTypeInfo() { + return typeInfo; + } + + public String getName() { + return this.name; + } + + public boolean isHidden() { + return isHidden; + } + + public boolean getIsHidden() { + return isHidden; + } + + public boolean isStats() { + return isStats; + } + + public ObjectInspector getObjectInspector() { + return oi; + } + + @Override + public int hashCode() { + int c = 19; + c = 31 * name.hashCode() + c; + return 31 * typeInfo.hashCode() + c; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof VirtualColumn)) { + return false; + } + VirtualColumn c = (VirtualColumn) o; + return name.equals(c.name) && typeInfo.equals(c.getTypeInfo()); + } +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/VirtualColumnProvider.java serde/src/java/org/apache/hadoop/hive/serde2/VirtualColumnProvider.java new file mode 100644 index 0000000..fa73f73 --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/VirtualColumnProvider.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2; + +import java.util.Collections; +import java.util.List; + +/** + * Implemented by Deserializer to provide custom virtual column(s). + */ +public interface VirtualColumnProvider { + + /** + * Return virtual columns supported by deserializer. + * @return + */ + List getVirtualColumns(); + + /** + * return value of virtual column provided, which should be conform with OI of the VC. + * @param vc a virtual column, provided by {@link #getVirtualColumns} + * @return + */ + Object evaluate(VirtualColumn vc); + + + // dummy implementation + VirtualColumnProvider NULL = new VirtualColumnProvider() { + @Override + public List getVirtualColumns() { + return Collections.emptyList(); + } + @Override + public Object evaluate(VirtualColumn vc) { + return null; + } + }; +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java index 0280b0e..93c207e 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeSpec; -import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -204,12 +203,6 @@ public ObjectInspector getObjectInspector() throws SerDeException { return oi; } - @Override - public SerDeStats getSerDeStats() { - // No support for statistics. That seems to be a popular answer. - return null; - } - private AvroDeserializer getDeserializer() { if(avroDeserializer == null) { avroDeserializer = new AvroDeserializer(); diff --git serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java index 2b7fba6..63a23d1 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeSpec; -import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -877,12 +876,6 @@ private static void serializeInt(ByteStream.Output buffer, int v, boolean invert writeByte(buffer, (byte) v, invert); } - @Override - public SerDeStats getSerDeStats() { - // no support for statistics - return null; - } - public static void serializeStruct(Output byteStream, Object[] fieldData, List fieldOis, boolean[] sortableSortOrders) throws SerDeException { for (int i = 0; i < fieldData.length; i++) { diff --git serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDeBase.java serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDeBase.java index 77c1e2f..7f6471d 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDeBase.java +++ serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDeBase.java @@ -22,10 +22,15 @@ import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.VirtualColumn; +import org.apache.hadoop.hive.serde2.VirtualColumnProvider; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Writable; -public abstract class ColumnarSerDeBase extends AbstractSerDe { +import java.util.Arrays; +import java.util.List; + +public abstract class ColumnarSerDeBase extends AbstractSerDe implements VirtualColumnProvider { // The object for storing row data ColumnarStructBase cachedLazyStruct; @@ -69,6 +74,21 @@ public SerDeStats getSerDeStats() { } @Override + public List getVirtualColumns() { + return Arrays.asList(VirtualColumn.RAWDATASIZE); + } + + @Override + public Object evaluate(VirtualColumn vc) { + assert vc == VirtualColumn.RAWDATASIZE; + // must be different + assert (lastOperationSerialize != lastOperationDeserialize); + + return lastOperationSerialize ? serializedSize : + cachedLazyStruct.getRawDataSerializedSize(); + } + + @Override public Class getSerializedClass() { return BytesRefArrayWritable.class; } diff --git serde/src/java/org/apache/hadoop/hive/serde2/dynamic_type/DynamicSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/dynamic_type/DynamicSerDe.java index 21cbd90..d3d168e 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/dynamic_type/DynamicSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/dynamic_type/DynamicSerDe.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeSpec; -import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; @@ -226,11 +225,4 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDe ret.set(bos_.getData(), 0, bos_.getLength()); return ret; } - - - @Override - public SerDeStats getSerDeStats() { - // no support for statistics - return null; - } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java index 95e30db..2e50d0f 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hive.serde2.SerDeSpec; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.VirtualColumn; +import org.apache.hadoop.hive.serde2.VirtualColumnProvider; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -71,7 +73,7 @@ serdeConstants.ESCAPE_CHAR, serdeConstants.SERIALIZATION_ENCODING, LazySimpleSerDe.SERIALIZATION_EXTEND_NESTING_LEVELS}) -public class LazySimpleSerDe extends AbstractEncodingAwareSerDe { +public class LazySimpleSerDe extends AbstractEncodingAwareSerDe implements VirtualColumnProvider { public static final Log LOG = LogFactory.getLog(LazySimpleSerDe.class .getName()); @@ -598,6 +600,20 @@ public SerDeStats getSerDeStats() { } @Override + public List getVirtualColumns() { + return Arrays.asList(VirtualColumn.RAWDATASIZE); + } + + @Override + public Object evaluate(VirtualColumn vc) { + assert vc == VirtualColumn.RAWDATASIZE; + // must be different + assert (lastOperationSerialize != lastOperationDeserialize); + + return lastOperationSerialize ? serializedSize : cachedLazyStruct.getRawDataSerializedSize(); + } + + @Override protected Writable transformFromUTF8(Writable blob) { Text text = (Text)blob; return SerDeUtils.transformTextFromUTF8(text, this.charset); diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java index 62cba01..a6218bb 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java @@ -34,6 +34,8 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeSpec; import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.VirtualColumn; +import org.apache.hadoop.hive.serde2.VirtualColumnProvider; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritable; @@ -75,7 +77,7 @@ * compact format. */ @SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES}) -public class LazyBinarySerDe extends AbstractSerDe { +public class LazyBinarySerDe extends AbstractSerDe implements VirtualColumnProvider { public static final Log LOG = LogFactory.getLog(LazyBinarySerDe.class.getName()); public LazyBinarySerDe() throws SerDeException { @@ -204,6 +206,21 @@ public Writable serialize(Object obj, ObjectInspector objInspector) return serializeBytesWritable; } + @Override + public List getVirtualColumns() { + return Arrays.asList(VirtualColumn.RAWDATASIZE); + } + + @Override + public Object evaluate(VirtualColumn vc) { + assert vc == VirtualColumn.RAWDATASIZE; + // must be different + assert (lastOperationSerialize != lastOperationDeserialize); + + return lastOperationSerialize ? serializedSize : + cachedLazyBinaryStruct.getRawDataSerializedSize(); + } + public static class StringWrapper { public byte[] bytes; public int start, length; diff --git serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftDeserializer.java serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftDeserializer.java index b78ec69..41a0a16 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftDeserializer.java +++ serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftDeserializer.java @@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.AbstractDeserializer; import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Writable; import org.apache.thrift.protocol.TProtocolFactory; @@ -78,10 +77,4 @@ public Object deserialize(Writable field) throws SerDeException { public ObjectInspector getObjectInspector() throws SerDeException { return tsd.getObjectInspector(); } - - @Override - public SerDeStats getSerDeStats() { - // no support for statistics - return null; - } }