diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 8f3f2b6..c1cfac1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.Operator; @@ -957,6 +958,12 @@ public CommandProcessorResponse run() public CommandProcessorResponse run(String command, boolean alreadyCompiled) throws CommandNeedRetryException { + try { + TxnDbUtil.prepDb(); + } + catch(Exception e) { + LOG.error(e); + } CommandProcessorResponse cpr = runInternal(command, alreadyCompiled); if(cpr.getResponseCode() == 0) { return cpr; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 0da886b..b5345ad 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -105,7 +105,7 @@ private transient JobConf job; private transient WritableComparable key; private transient Writable value; - private transient Writable[] vcValues; + private transient Object[] vcValues; private transient Deserializer serde; private transient Deserializer tblSerde; private transient Converter partTblObjectInspectorConverter; @@ -141,8 +141,7 @@ private void initialize() { List names = new ArrayList(vcCols.size()); List inspectors = new ArrayList(vcCols.size()); for (VirtualColumn vc : vcCols) { - inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( - vc.getTypeInfo())); + inspectors.add(vc.getObjectInspector()); names.add(vc.getName()); } vcsOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index d5de58e..cad61ab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -140,7 +140,7 @@ public int hashCode() { String tableName; String partName; List vcs; - Writable[] vcValues; + Object[] vcValues; private boolean isPartitioned() { return partObjectInspector != null; @@ -165,7 +165,7 @@ public StructObjectInspector getRowObjectInspector() { * op. * * @param hconf - * @param mrwork + * @param mapWork * @throws HiveException */ public void initializeAsRoot(Configuration hconf, MapWork mapWork) @@ -250,13 +250,13 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, // The op may not be a TableScan for mapjoins // Consider the query: select /*+MAPJOIN(a)*/ count(*) FROM T1 a JOIN T2 b ON a.key = b.key; - // In that case, it will be a Select, but the rowOI need not be ammended + // In that case, it will be a Select, but the rowOI need not be amended if (ctx.op instanceof TableScanOperator) { TableScanOperator tsOp = (TableScanOperator) ctx.op; TableScanDesc tsDesc = tsOp.getConf(); if (tsDesc != null && tsDesc.hasVirtualCols()) { opCtx.vcs = tsDesc.getVirtualCols(); - opCtx.vcValues = new Writable[opCtx.vcs.size()]; + opCtx.vcValues = new Object[opCtx.vcs.size()]; opCtx.vcsObjectInspector = VirtualColumn.getVCSObjectInspector(opCtx.vcs); if (opCtx.isPartitioned()) { opCtx.rowWithPartAndVC = Arrays.copyOfRange(opCtx.rowWithPart, 0, 3); @@ -550,13 +550,13 @@ public void process(Writable value) throws HiveException { } } - public static Writable[] populateVirtualColumnValues(ExecMapperContext ctx, - List vcs, Writable[] vcValues, Deserializer deserializer) { + public static Object[] populateVirtualColumnValues(ExecMapperContext ctx, + List vcs, Object[] vcValues, Deserializer deserializer) { if (vcs == null) { return vcValues; } if (vcValues == null) { - vcValues = new Writable[vcs.size()]; + vcValues = new Object[vcs.size()]; } for (int i = 0; i < vcs.size(); i++) { VirtualColumn vc = vcs.get(i); @@ -602,6 +602,17 @@ public void process(Writable value) throws HiveException { old.set(current); } } + else if(vc.equals(VirtualColumn.ROWID)) { + if(vcValues[i] == null) { + vcValues[i] = new Object[3]; + } + Object[] recordIdentifierStruct = (Object[])vcValues[i]; + //order of elements in the array should match RecordIdentifier.sti + //todo: add a method on RecordIdentifier to do this + recordIdentifierStruct[0] = ctx.getIoCxt().getCurrentRowId().getTransactionId(); + recordIdentifierStruct[1] = ctx.getIoCxt().getCurrentRowId().getBucketId(); + recordIdentifierStruct[2] = ctx.getIoCxt().getCurrentRowId().getRowId(); + } } return vcValues; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index 4e0fd79..7fb4c46 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -155,7 +155,7 @@ public void configure(JobConf job) { } } } - + @Override public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { if (oc == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java index f874d86..44c58b4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java @@ -20,21 +20,18 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.FooterBuffer; import org.apache.hadoop.hive.ql.io.IOContext.Comparison; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; @@ -42,16 +39,13 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.util.ReflectionUtils; /** This class prepares an IOContext, and provides the ability to perform a binary search on the * data. The binary search can be used by setting the value of inputFormatSorted in the @@ -119,7 +113,13 @@ public boolean next(K key, V value) throws IOException { } updateIOContext(); try { - return doNext(key, value); + boolean retVal = doNext(key, value); + if(recordReader instanceof OrcInputFormat.RIAwareRecordReader) { + OrcInputFormat.RIAwareRecordReader ri = (OrcInputFormat.RIAwareRecordReader)recordReader; + ioCxtRef.currentRowId = new RecordIdentifier(); + ioCxtRef.currentRowId.set(ri.id); + } + return retVal; } catch (IOException e) { ioCxtRef.setIOExceptions(true); throw e; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java index 8cbf32f..d229d46 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java @@ -58,6 +58,7 @@ public static void clear() { long currentRow; boolean isBlockPointer; boolean ioExceptions; + RecordIdentifier currentRowId = new RecordIdentifier(711, 712, 713); // Are we using the fact the input is sorted boolean useSorted = false; @@ -111,6 +112,14 @@ public void setCurrentRow(long currentRow) { this.currentRow = currentRow; } + /** + * Tables using AcidInputFormat support a row id + * @return + */ + public RecordIdentifier getCurrentRowId() { + return currentRowId; + } + public boolean isBlockPointer() { return isBlockPointer; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java index 38a0d6b..2e02518 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java @@ -19,16 +19,41 @@ package org.apache.hadoop.hive.ql.io; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; /** * Gives the Record identifer information for the current record. */ public class RecordIdentifier implements WritableComparable { + + public static final String ROW__ID__TXN__ID = "transactionId"; + public static final String ROW__ID__BKT__ID = "bucketId"; + public static final String ROW__ID__ROW__ID = "rowId"; + + private final static List fieldNames = + Arrays.asList(ROW__ID__TXN__ID, ROW__ID__BKT__ID, ROW__ID__ROW__ID); + public final static TypeInfo sti = TypeInfoFactory.getStructTypeInfo( + fieldNames, + Arrays.asList((TypeInfo)TypeInfoFactory.longTypeInfo, TypeInfoFactory.intTypeInfo, + TypeInfoFactory.longTypeInfo) + ); + public final static ObjectInspector oi = ObjectInspectorFactory.getStandardStructObjectInspector( + fieldNames, + Arrays.asList((ObjectInspector)PrimitiveObjectInspectorFactory.javaLongObjectInspector, + PrimitiveObjectInspectorFactory.javaIntObjectInspector, + PrimitiveObjectInspectorFactory.javaLongObjectInspector)); + private long transactionId; private int bucketId; private long rowId; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java index 192d216..093f40b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java @@ -40,26 +40,17 @@ void insert(long currentTransaction, /** * Update an old record with a new set of values. * @param currentTransaction the current transaction id - * @param originalTransaction the row's original transaction id - * @param rowId the original row id * @param row the new values for the row * @throws IOException */ - void update(long currentTransaction, - long originalTransaction, - long rowId, - Object row) throws IOException; + void update(long currentTransaction, Object row) throws IOException; /** * Delete a row from the table. * @param currentTransaction the current transaction id - * @param originalTransaction the rows original transaction id - * @param rowId the original row id * @throws IOException */ - void delete(long currentTransaction, - long originalTransaction, - long rowId) throws IOException; + void delete(long currentTransaction, Object row) throws IOException; /** * Flush the current set of rows to the underlying file system, so that diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 7edb3c2..f113e34 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1002,61 +1002,64 @@ private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics, OrcSplit split = (OrcSplit) inputSplit; reporter.setStatus(inputSplit.toString()); + Options options = new Options(conf).reporter(reporter); + final RowReader inner = getReader(inputSplit, options); // if we are strictly old-school, just use the old code if (split.isOriginal() && split.getDeltas().isEmpty()) { if (vectorMode) { return createVectorizedReader(inputSplit, conf, reporter); } else { - return new OrcRecordReader(OrcFile.createReader(split.getPath(), - OrcFile.readerOptions(conf)), conf, split); + return new RIAwareRecordReader(inner); } } - Options options = new Options(conf).reporter(reporter); - final RowReader inner = getReader(inputSplit, options); if (vectorMode) { return (org.apache.hadoop.mapred.RecordReader) new VectorizedOrcAcidRowReader(inner, conf, (FileSplit) inputSplit); } - final RecordIdentifier id = inner.createKey(); - - // Return a RecordReader that is compatible with the Hive 0.12 reader - // with NullWritable for the key instead of RecordIdentifier. - return new org.apache.hadoop.mapred.RecordReader(){ - @Override - public boolean next(NullWritable nullWritable, - OrcStruct orcStruct) throws IOException { - return inner.next(id, orcStruct); - } + return new RIAwareRecordReader(inner); + } + //a RecordReader that is compatible with the Hive 0.12 reader + // with NullWritable for the key instead of RecordIdentifier. + public static class RIAwareRecordReader implements org.apache.hadoop.mapred.RecordReader { + private final RowReader inner; + public RecordIdentifier id; + private RIAwareRecordReader(final RowReader inner) { + this.inner = inner; + id = inner.createKey(); + } + @Override + public boolean next(NullWritable nullWritable, + OrcStruct orcStruct) throws IOException { + return inner.next(id, orcStruct); + }//2)and here we want to load the RI - @Override - public NullWritable createKey() { - return NullWritable.get(); - } + @Override + public NullWritable createKey() { + return NullWritable.get(); + } - @Override - public OrcStruct createValue() { - return inner.createValue(); - } + @Override + public OrcStruct createValue() { + return inner.createValue();//1)so here we want to augment this struct with RecordIdentifier + } - @Override - public long getPos() throws IOException { - return inner.getPos(); - } + @Override + public long getPos() throws IOException { + return inner.getPos(); + } - @Override - public void close() throws IOException { - inner.close(); - } + @Override + public void close() throws IOException { + inner.close(); + } - @Override - public float getProgress() throws IOException { - return inner.getProgress(); - } - }; + @Override + public float getProgress() throws IOException { + return inner.getProgress(); + } } - @Override public RowReader getReader(InputSplit inputSplit, Options options) throws IOException { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index 00e0807..0a13211 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -211,18 +211,14 @@ public void insert(long currentTransaction, Object row) throws IOException { } @Override - public void update(long currentTransaction, long originalTransaction, - long rowId, Object row) throws IOException { + public void update(long currentTransaction, Object row) throws IOException { out.println("update " + path + " currTxn: " + currentTransaction + - " origTxn: " + originalTransaction + " row: " + rowId + " obj: " + - stringifyObject(row, inspector)); + " obj: " + stringifyObject(row, inspector)); } @Override - public void delete(long currentTransaction, long originalTransaction, - long rowId) throws IOException { - out.println("delete " + path + " currTxn: " + currentTransaction + - " origTxn: " + originalTransaction + " row: " + rowId); + public void delete(long currentTransaction, Object row) throws IOException { + out.println("delete " + path + " currTxn: " + currentTransaction + " obj: " + row); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 8f17c12..b347439 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -28,21 +28,23 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaLongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; -import java.nio.charset.CharsetEncoder; import java.util.ArrayList; import java.util.List; @@ -89,6 +91,11 @@ private final LongWritable rowId = new LongWritable(); private long insertedRows = 0; private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder(); + private StructField rowIdField = null; + private StructField numericRowIdField = null; + private StructField originalTxnField = null; + private StructObjectInspector rowInspector; + private StructObjectInspector rowIdInspector; static class AcidStats { long inserts; @@ -176,7 +183,7 @@ public OrcOptions orcOptions(OrcFile.WriterOptions opts) { * @param rowInspector the row's object inspector * @return an object inspector for the event stream */ - static ObjectInspector createEventSchema(ObjectInspector rowInspector) { + static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { List fields = new ArrayList(); fields.add(new OrcStruct.Field("operation", PrimitiveObjectInspectorFactory.writableIntObjectInspector, OPERATION)); @@ -234,7 +241,8 @@ static ObjectInspector createEventSchema(ObjectInspector rowInspector) { writerOptions.bufferSize(DELTA_BUFFER_SIZE); writerOptions.stripeSize(DELTA_STRIPE_SIZE); } - writerOptions.inspector(createEventSchema(options.getInspector())); + rowInspector = (StructObjectInspector)options.getInspector(); + writerOptions.inspector(createEventSchema(findRowId(options.getInspector()))); this.writer = OrcFile.createWriter(this.path, writerOptions); item = new OrcStruct(FIELDS); item.setFieldValue(OPERATION, operation); @@ -244,13 +252,50 @@ static ObjectInspector createEventSchema(ObjectInspector rowInspector) { item.setFieldValue(ROW_ID, rowId); } - private void addEvent(int operation, long currentTransaction, - long originalTransaction, long rowId, - Object row) throws IOException { + private ObjectInspector findRowId(ObjectInspector inspector) { + if (!(inspector instanceof StructObjectInspector)) { + throw new RuntimeException("Serious problem, expected a StructObjectInspector, but got a " + + inspector.getClass().getName()); + } + RowIdStrippingObjectInspector newInspector = new RowIdStrippingObjectInspector(inspector); + rowIdField = newInspector.getRowId(); + if (rowIdField == null) { + return inspector; + } else { + List fields = + ((StructObjectInspector) rowIdField.getFieldObjectInspector()).getAllStructFieldRefs(); + for (StructField field : fields) { + LOG.debug("Found columnname " + field.getFieldName()); + if (RecordIdentifier.ROW__ID__TXN__ID.equalsIgnoreCase(field.getFieldName())) { + originalTxnField = field; + } else if (RecordIdentifier.ROW__ID__ROW__ID.equalsIgnoreCase(field.getFieldName())) { + numericRowIdField = field; + } + } + if (originalTxnField == null || numericRowIdField == null) { + throw new RuntimeException("Serious problem: Found rowId struct but could not find rowId" + + " or originalTxn field!"); + } + rowIdInspector = (StructObjectInspector)rowIdField.getFieldObjectInspector(); + return newInspector; + } + } + + private void addEvent(int operation, long currentTransaction, long rowId, + boolean readRowIdFromRow, Object row) + throws IOException { this.operation.set(operation); this.currentTransaction.set(currentTransaction); - this.originalTransaction.set(originalTransaction); + long originalTransaction = currentTransaction; + if (readRowIdFromRow) { + Object rowIdValue = rowInspector.getStructFieldData(row, rowIdField); + originalTransaction = PrimitiveObjectInspectorFactory.javaLongObjectInspector.get( + rowIdInspector.getStructFieldData(rowIdValue, originalTxnField)); + rowId = PrimitiveObjectInspectorFactory.javaLongObjectInspector.get( + rowIdInspector.getStructFieldData(rowIdValue, numericRowIdField)); + } this.rowId.set(rowId); + this.originalTransaction.set(originalTransaction); item.setFieldValue(OrcRecordUpdater.ROW, row); indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId); writer.addRow(item); @@ -261,28 +306,23 @@ public void insert(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; } - addEvent(INSERT_OPERATION, currentTransaction, currentTransaction, - insertedRows++, row); + addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, false, row); } @Override - public void update(long currentTransaction, long originalTransaction, - long rowId, Object row) throws IOException { + public void update(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; } - addEvent(UPDATE_OPERATION, currentTransaction, originalTransaction, rowId, - row); + addEvent(UPDATE_OPERATION, currentTransaction, -1L, true, row); } @Override - public void delete(long currentTransaction, long originalTransaction, - long rowId) throws IOException { + public void delete(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; } - addEvent(DELETE_OPERATION, currentTransaction, originalTransaction, rowId, - null); + addEvent(DELETE_OPERATION, currentTransaction, -1, true, row); } @Override @@ -397,4 +437,62 @@ void addKey(int op, long transaction, int bucket, long rowId) { lastRowId = rowId; } } + + private static class RowIdStrippingObjectInspector extends StructObjectInspector { + private StructObjectInspector wrapped; + List fields; + StructField rowId; + + RowIdStrippingObjectInspector(ObjectInspector oi) { + if (!(oi instanceof StructObjectInspector)) { + throw new RuntimeException("Serious problem, expected a StructObjectInspector, " + + "but got a " + oi.getClass().getName()); + } + wrapped = (StructObjectInspector)oi; + fields = new ArrayList(wrapped.getAllStructFieldRefs().size()); + for (StructField field : wrapped.getAllStructFieldRefs()) { + if (VirtualColumn.ROWID.getName().equalsIgnoreCase(field.getFieldName())) { + rowId = field; + } else { + fields.add(field); + } + } + } + + @Override + public List getAllStructFieldRefs() { + return fields; + } + + @Override + public StructField getStructFieldRef(String fieldName) { + return wrapped.getStructFieldRef(fieldName); + } + + @Override + public Object getStructFieldData(Object data, StructField fieldRef) { + // For performance don't check that that the fieldRef isn't rowId everytime, + // just assume that the caller used getAllStructFieldRefs and thus doesn't have that fieldRef + return wrapped.getStructFieldData(data, fieldRef); + } + + @Override + public List getStructFieldsDataAsList(Object data) { + return wrapped.getStructFieldsDataAsList(data); + } + + @Override + public String getTypeName() { + return wrapped.getTypeName(); + } + + @Override + public Category getCategory() { + return wrapped.getCategory(); + } + + StructField getRowId() { + return rowId; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java index 0637d46..8e67a3e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java @@ -25,11 +25,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; public class VirtualColumn implements Serializable { @@ -41,6 +43,10 @@ public static VirtualColumn ROWOFFSET = new VirtualColumn("ROW__OFFSET__INSIDE__BLOCK", (PrimitiveTypeInfo)TypeInfoFactory.longTypeInfo); public static VirtualColumn RAWDATASIZE = new VirtualColumn("RAW__DATA__SIZE", (PrimitiveTypeInfo)TypeInfoFactory.longTypeInfo); + /** + * {@link org.apache.hadoop.hive.ql.io.RecordIdentifier} + */ + public static VirtualColumn ROWID = new VirtualColumn("ROW__ID", RecordIdentifier.sti, true, RecordIdentifier.oi); /** * GROUPINGID is used with GROUP BY GROUPINGS SETS, ROLLUP and CUBE. @@ -53,23 +59,26 @@ new VirtualColumn("GROUPING__ID", (PrimitiveTypeInfo) TypeInfoFactory.intTypeInfo); public static VirtualColumn[] VIRTUAL_COLUMNS = - new VirtualColumn[] {FILENAME, BLOCKOFFSET, ROWOFFSET, RAWDATASIZE, GROUPINGID}; + new VirtualColumn[] {FILENAME, BLOCKOFFSET, ROWOFFSET, RAWDATASIZE, GROUPINGID, ROWID}; private String name; - private PrimitiveTypeInfo typeInfo; + private TypeInfo typeInfo; private boolean isHidden = true; + private ObjectInspector oi; public VirtualColumn() { } public VirtualColumn(String name, PrimitiveTypeInfo typeInfo) { - this(name, typeInfo, true); + this(name, typeInfo, true, + PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(typeInfo)); } - VirtualColumn(String name, PrimitiveTypeInfo typeInfo, boolean isHidden) { + VirtualColumn(String name, TypeInfo typeInfo, boolean isHidden, ObjectInspector oi) { this.name = name; this.typeInfo = typeInfo; this.isHidden = isHidden; + this.oi = oi; } public static List getStatsRegistry(Configuration conf) { @@ -87,11 +96,12 @@ public VirtualColumn(String name, PrimitiveTypeInfo typeInfo) { if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEROWOFFSET)) { l.add(ROWOFFSET); } + l.add(ROWID); return l; } - public PrimitiveTypeInfo getTypeInfo() { + public TypeInfo getTypeInfo() { return typeInfo; } @@ -118,6 +128,9 @@ public boolean getIsHidden() { public void setIsHidden(boolean isHidden) { this.isHidden = isHidden; } + public ObjectInspector getObjectInspector() { + return oi; + } @Override public boolean equals(Object o) { @@ -144,8 +157,7 @@ public static StructObjectInspector getVCSObjectInspector(List vc List inspectors = new ArrayList(vcs.size()); for (VirtualColumn vc : vcs) { names.add(vc.getName()); - inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( - vc.getTypeInfo())); + inspectors.add(vc.oi); } return ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors); } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 35e30b8..94b98f2 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -973,7 +973,7 @@ public void testInOutFormat() throws Exception { List fields =inspector.getAllStructFieldRefs(); IntObjectInspector intInspector = (IntObjectInspector) fields.get(0).getFieldObjectInspector(); - assertEquals(0.0, reader.getProgress(), 0.00001); + assertEquals(0.33, reader.getProgress(), 0.1); while (reader.next(key, value)) { assertEquals(++rowNum, intInspector.get(inspector. getStructFieldData(serde.deserialize(value), fields.get(0)))); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index b4ce4a0..ab629dc 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -454,9 +454,16 @@ public void testNewBase() throws Exception { static class MyRow { Text col1; + RecordIdentifier ROW__ID; + MyRow(String val) { col1 = new Text(val); } + + MyRow(String val, long rowId, long origTxn, int bucket) { + col1 = new Text(val); + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); + } } static String getValue(OrcStruct event) { @@ -534,11 +541,11 @@ public void testNewBaseAndDelta() throws Exception { // write a delta ru = of.getRecordUpdater(root, options.writingBase(false) .minimumTransactionId(200).maximumTransactionId(200)); - ru.update(200, 0, 0, new MyRow("update 1")); - ru.update(200, 0, 2, new MyRow("update 2")); - ru.update(200, 0, 3, new MyRow("update 3")); - ru.delete(200, 0, 7); - ru.delete(200, 0, 8); + ru.update(200, new MyRow("update 1", 0, 0, BUCKET)); + ru.update(200, new MyRow("update 2", 2, 0, BUCKET)); + ru.update(200, new MyRow("update 3", 3, 0, BUCKET)); + ru.delete(200, new MyRow("", 7, 0, BUCKET)); + ru.delete(200, new MyRow("", 8, 0, BUCKET)); ru.close(false); ValidTxnList txnList = new ValidTxnListImpl("200:"); @@ -607,13 +614,13 @@ public void testNewBaseAndDelta() throws Exception { assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 7, 200), id); - assertEquals(null, OrcRecordUpdater.getRow(event)); + assertEquals(1, OrcRecordUpdater.getRow(event).getNumFields()); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 8, 200), id); - assertEquals(null, OrcRecordUpdater.getRow(event)); + assertEquals(1, OrcRecordUpdater.getRow(event).getNumFields()); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, @@ -693,7 +700,7 @@ public void testNewBaseAndDelta() throws Exception { assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 7, 200), id); - assertEquals(null, OrcRecordUpdater.getRow(event)); + assertEquals(1, OrcRecordUpdater.getRow(event).getNumFields()); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, @@ -705,7 +712,7 @@ public void testNewBaseAndDelta() throws Exception { assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 8, 200), id); - assertEquals(null, OrcRecordUpdater.getRow(event)); + assertEquals(1, OrcRecordUpdater.getRow(event).getNumFields()); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, @@ -747,6 +754,7 @@ public void testNewBaseAndDelta() throws Exception { Text mytext; float myfloat; double mydouble; + RecordIdentifier ROW__ID; BigRow(int myint, long mylong, String mytext, float myfloat, double mydouble) { this.myint = myint; @@ -754,6 +762,21 @@ public void testNewBaseAndDelta() throws Exception { this.mytext = new Text(mytext); this.myfloat = myfloat; this.mydouble = mydouble; + ROW__ID = null; + } + + BigRow(int myint, long mylong, String mytext, float myfloat, double mydouble, + long rowId, long origTxn, int bucket) { + this.myint = myint; + this.mylong = mylong; + this.mytext = new Text(mytext); + this.myfloat = myfloat; + this.mydouble = mydouble; + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); + } + + BigRow(long rowId, long origTxn, int bucket) { + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); } } @@ -808,10 +831,10 @@ synchronized void addedRow() throws IOException { "ignore.7"}; for(int i=0; i < values.length; ++i) { if (values[i] != null) { - ru.update(1, 0, i, new BigRow(i, i, values[i], i, i)); + ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); } } - ru.delete(100, 0, 9); + ru.delete(100, new BigRow(9, 0, BUCKET)); ru.close(false); // write a delta @@ -820,10 +843,10 @@ synchronized void addedRow() throws IOException { values = new String[]{null, null, "1.0", null, null, null, null, "3.1"}; for(int i=0; i < values.length; ++i) { if (values[i] != null) { - ru.update(2, 0, i, new BigRow(i, i, values[i], i, i)); + ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); } } - ru.delete(100, 0, 8); + ru.delete(100, new BigRow(8, 0, BUCKET)); ru.close(false); InputFormat inf = new OrcInputFormat(); @@ -908,10 +931,10 @@ synchronized void addedRow() throws IOException { "ignore.7"}; for(int i=0; i < values.length; ++i) { if (values[i] != null) { - ru.update(1, 0, i, new BigRow(i, i, values[i], i, i)); + ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); } } - ru.delete(100, 0, 9); + ru.delete(100, new BigRow(9, 0, BUCKET)); ru.close(false); // write a delta @@ -920,10 +943,10 @@ synchronized void addedRow() throws IOException { values = new String[]{null, null, "1.0", null, null, null, null, "3.1"}; for(int i=0; i < values.length; ++i) { if (values[i] != null) { - ru.update(2, 0, i, new BigRow(i, i, values[i], i, i)); + ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); } } - ru.delete(100, 0, 8); + ru.delete(100, new BigRow(8, 0, BUCKET)); ru.close(false); InputFormat inf = new OrcInputFormat(); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java index b53bd85..761a5ce 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; @@ -63,9 +64,18 @@ public void testAccessors() throws Exception { static class MyRow { Text field; + RecordIdentifier ROW__ID; + MyRow(String val) { field = new Text(val); + ROW__ID = null; } + + MyRow(String val, long rowId, long origTxn, int bucket) { + field = new Text(val); + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); + } + } @Test @@ -171,17 +181,18 @@ public void testUpdates() throws Exception { inspector = ObjectInspectorFactory.getReflectionObjectInspector (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } + int bucket = 20; AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .filesystem(fs) - .bucket(20) + .bucket(bucket) .writingBase(false) .minimumTransactionId(100) .maximumTransactionId(100) .inspector(inspector) .reporter(Reporter.NULL); RecordUpdater updater = new OrcRecordUpdater(root, options); - updater.update(100, 10, 30, new MyRow("update")); - updater.delete(100, 40, 60); + updater.update(100, new MyRow("update", 30, 10, bucket)); + updater.delete(100, new MyRow("", 60, 40, bucket)); updater.close(false); Path bucketPath = AcidUtils.createFilename(root, options); @@ -208,7 +219,7 @@ public void testUpdates() throws Exception { assertEquals(40, OrcRecordUpdater.getOriginalTransaction(row)); assertEquals(20, OrcRecordUpdater.getBucket(row)); assertEquals(60, OrcRecordUpdater.getRowId(row)); - assertEquals(null, OrcRecordUpdater.getRow(row)); + assertEquals(1, OrcRecordUpdater.getRow(row).getNumFields()); assertEquals(false, rows.hasNext()); } }