diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java index 88e7106..5184ca8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -52,6 +52,7 @@ private int bucket; private PrintStream dummyStream = null; private boolean oldStyle = false; + private int recIdCol = -1; // Column the record identifier is in, -1 indicates no record id /** * Create the options object. @@ -164,6 +165,16 @@ Options setOldStyle(boolean value) { } /** + * Which column the row id field is in. + * @param recIdCol + * @return this + */ + public Options recIdCol(int recIdCol) { + this.recIdCol = recIdCol; + return this; + } + + /** * Temporary switch while we are in development that replaces the * implementation with a dummy one that just prints to stream. * @param stream the stream to print to @@ -214,6 +225,10 @@ public int getBucket() { return bucket; } + public int getRecIdCol() { + return recIdCol; + } + public PrintStream getDummyStream() { return dummyStream; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java index 192d216..093f40b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java @@ -40,26 +40,17 @@ void insert(long currentTransaction, /** * Update an old record with a new set of values. * @param currentTransaction the current transaction id - * @param originalTransaction the row's original transaction id - * @param rowId the original row id * @param row the new values for the row * @throws IOException */ - void update(long currentTransaction, - long originalTransaction, - long rowId, - Object row) throws IOException; + void update(long currentTransaction, Object row) throws IOException; /** * Delete a row from the table. * @param currentTransaction the current transaction id - * @param originalTransaction the rows original transaction id - * @param rowId the original row id * @throws IOException */ - void delete(long currentTransaction, - long originalTransaction, - long rowId) throws IOException; + void delete(long currentTransaction, Object row) throws IOException; /** * Flush the current set of rows to the underlying file system, so that diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index 2749c7f..5bd3f0c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -211,18 +211,14 @@ public void insert(long currentTransaction, Object row) throws IOException { } @Override - public void update(long currentTransaction, long originalTransaction, - long rowId, Object row) throws IOException { + public void update(long currentTransaction, Object row) throws IOException { out.println("update " + path + " currTxn: " + currentTransaction + - " origTxn: " + originalTransaction + " row: " + rowId + " obj: " + - stringifyObject(row, inspector)); + " obj: " + stringifyObject(row, inspector)); } @Override - public void delete(long currentTransaction, long originalTransaction, - long rowId) throws IOException { - out.println("delete " + path + " currTxn: " + currentTransaction + - " origTxn: " + originalTransaction + " row: " + rowId); + public void delete(long currentTransaction, Object row) throws IOException { + out.println("delete " + path + " currTxn: " + currentTransaction + " obj: " + row); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index b7ec309..db00886 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -31,18 +31,18 @@ import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; -import java.nio.charset.CharsetEncoder; import java.util.ArrayList; import java.util.List; @@ -92,6 +92,14 @@ // because that is monotonically increasing to give new unique row ids. private long rowCountDelta = 0; private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder(); + private StructField recIdField = null; // field to look for the record identifier in + private StructField rowIdField = null; // field inside recId to look for row id in + private StructField originalTxnField = null; // field inside recId to look for original txn in + private StructObjectInspector rowInspector; // OI for the original row + private StructObjectInspector recIdInspector; // OI for the record identifier struct + private LongObjectInspector rowIdInspector; // OI for the long row id inside the recordIdentifier + private LongObjectInspector origTxnInspector; // OI for the original txn inside the record + // identifer static class AcidStats { long inserts; @@ -179,7 +187,7 @@ public OrcOptions orcOptions(OrcFile.WriterOptions opts) { * @param rowInspector the row's object inspector * @return an object inspector for the event stream */ - static ObjectInspector createEventSchema(ObjectInspector rowInspector) { + static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { List fields = new ArrayList(); fields.add(new OrcStruct.Field("operation", PrimitiveObjectInspectorFactory.writableIntObjectInspector, OPERATION)); @@ -237,7 +245,9 @@ static ObjectInspector createEventSchema(ObjectInspector rowInspector) { writerOptions.bufferSize(DELTA_BUFFER_SIZE); writerOptions.stripeSize(DELTA_STRIPE_SIZE); } - writerOptions.inspector(createEventSchema(options.getInspector())); + rowInspector = (StructObjectInspector)options.getInspector(); + writerOptions.inspector(createEventSchema(findRecId(options.getInspector(), + options.getRecIdCol()))); this.writer = OrcFile.createWriter(this.path, writerOptions); item = new OrcStruct(FIELDS); item.setFieldValue(OPERATION, operation); @@ -247,14 +257,50 @@ static ObjectInspector createEventSchema(ObjectInspector rowInspector) { item.setFieldValue(ROW_ID, rowId); } - private void addEvent(int operation, long currentTransaction, - long originalTransaction, long rowId, - Object row) throws IOException { + // Find the record identifier column (if there) and return a possibly new ObjectInspector that + // will strain out the record id for the underlying writer. + private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) { + if (!(inspector instanceof StructObjectInspector)) { + throw new RuntimeException("Serious problem, expected a StructObjectInspector, but got a " + + inspector.getClass().getName()); + } + if (rowIdColNum < 0) { + return inspector; + } else { + RecIdStrippingObjectInspector newInspector = + new RecIdStrippingObjectInspector(inspector, rowIdColNum); + recIdField = newInspector.getRecId(); + List fields = + ((StructObjectInspector) recIdField.getFieldObjectInspector()).getAllStructFieldRefs(); + // Go by position, not field name, as field names aren't guaranteed. The order of fields + // in RecordIdentifier is transactionId, bucketId, rowId + originalTxnField = fields.get(0); + origTxnInspector = (LongObjectInspector)originalTxnField.getFieldObjectInspector(); + rowIdField = fields.get(2); + rowIdInspector = (LongObjectInspector)rowIdField.getFieldObjectInspector(); + + + recIdInspector = (StructObjectInspector) recIdField.getFieldObjectInspector(); + return newInspector; + } + } + + private void addEvent(int operation, long currentTransaction, long rowId, Object row) + throws IOException { this.operation.set(operation); this.currentTransaction.set(currentTransaction); - this.originalTransaction.set(originalTransaction); + // If this is an insert, originalTransaction should be set to this transaction. If not, + // it will be reset by the following if anyway. + long originalTransaction = currentTransaction; + if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) { + Object rowIdValue = rowInspector.getStructFieldData(row, recIdField); + originalTransaction = origTxnInspector.get( + recIdInspector.getStructFieldData(rowIdValue, originalTxnField)); + rowId = rowIdInspector.get(recIdInspector.getStructFieldData(rowIdValue, rowIdField)); + } this.rowId.set(rowId); - item.setFieldValue(OrcRecordUpdater.ROW, row); + this.originalTransaction.set(originalTransaction); + item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row)); indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId); writer.addRow(item); } @@ -264,30 +310,26 @@ public void insert(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; } - addEvent(INSERT_OPERATION, currentTransaction, currentTransaction, - insertedRows++, row); + addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); rowCountDelta++; } @Override - public void update(long currentTransaction, long originalTransaction, - long rowId, Object row) throws IOException { + public void update(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; } - addEvent(UPDATE_OPERATION, currentTransaction, originalTransaction, rowId, - row); + addEvent(UPDATE_OPERATION, currentTransaction, -1L, row); } @Override - public void delete(long currentTransaction, long originalTransaction, - long rowId) throws IOException { + public void delete(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; } - addEvent(DELETE_OPERATION, currentTransaction, originalTransaction, rowId, - null); + addEvent(DELETE_OPERATION, currentTransaction, -1, row); rowCountDelta--; + } @Override @@ -311,7 +353,7 @@ public void close(boolean abort) throws IOException { fs.delete(path, false); } } else { - writer.close(); + if (writer != null) writer.close(); } if (flushLengths != null) { flushLengths.close(); @@ -406,4 +448,67 @@ void addKey(int op, long transaction, int bucket, long rowId) { lastRowId = rowId; } } + + /** + * An ObjectInspector that will strip out the record identifier so that the underlying writer + * doesn't see it. + */ + private static class RecIdStrippingObjectInspector extends StructObjectInspector { + private StructObjectInspector wrapped; + List fields; + StructField recId; + + RecIdStrippingObjectInspector(ObjectInspector oi, int rowIdColNum) { + if (!(oi instanceof StructObjectInspector)) { + throw new RuntimeException("Serious problem, expected a StructObjectInspector, " + + "but got a " + oi.getClass().getName()); + } + wrapped = (StructObjectInspector)oi; + List wrappedFields = wrapped.getAllStructFieldRefs(); + fields = new ArrayList(wrapped.getAllStructFieldRefs().size()); + for (int i = 0; i < wrappedFields.size(); i++) { + if (i == rowIdColNum) { + recId = wrappedFields.get(i); + } else { + fields.add(wrappedFields.get(i)); + } + } + } + + @Override + public List getAllStructFieldRefs() { + return fields; + } + + @Override + public StructField getStructFieldRef(String fieldName) { + return wrapped.getStructFieldRef(fieldName); + } + + @Override + public Object getStructFieldData(Object data, StructField fieldRef) { + // For performance don't check that that the fieldRef isn't recId everytime, + // just assume that the caller used getAllStructFieldRefs and thus doesn't have that fieldRef + return wrapped.getStructFieldData(data, fieldRef); + } + + @Override + public List getStructFieldsDataAsList(Object data) { + return wrapped.getStructFieldsDataAsList(data); + } + + @Override + public String getTypeName() { + return wrapped.getTypeName(); + } + + @Override + public Category getCategory() { + return wrapped.getCategory(); + } + + StructField getRecId() { + return recId; + } + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index b4ce4a0..ac7fe1a 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -56,6 +56,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; public class TestOrcRawRecordMerger { @@ -454,9 +455,16 @@ public void testNewBase() throws Exception { static class MyRow { Text col1; + RecordIdentifier ROW__ID; + MyRow(String val) { col1 = new Text(val); } + + MyRow(String val, long rowId, long origTxn, int bucket) { + col1 = new Text(val); + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); + } } static String getValue(OrcStruct event) { @@ -533,12 +541,12 @@ public void testNewBaseAndDelta() throws Exception { // write a delta ru = of.getRecordUpdater(root, options.writingBase(false) - .minimumTransactionId(200).maximumTransactionId(200)); - ru.update(200, 0, 0, new MyRow("update 1")); - ru.update(200, 0, 2, new MyRow("update 2")); - ru.update(200, 0, 3, new MyRow("update 3")); - ru.delete(200, 0, 7); - ru.delete(200, 0, 8); + .minimumTransactionId(200).maximumTransactionId(200).recIdCol(1)); + ru.update(200, new MyRow("update 1", 0, 0, BUCKET)); + ru.update(200, new MyRow("update 2", 2, 0, BUCKET)); + ru.update(200, new MyRow("update 3", 3, 0, BUCKET)); + ru.delete(200, new MyRow("", 7, 0, BUCKET)); + ru.delete(200, new MyRow("", 8, 0, BUCKET)); ru.close(false); ValidTxnList txnList = new ValidTxnListImpl("200:"); @@ -607,13 +615,13 @@ public void testNewBaseAndDelta() throws Exception { assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 7, 200), id); - assertEquals(null, OrcRecordUpdater.getRow(event)); + assertNull(OrcRecordUpdater.getRow(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 8, 200), id); - assertEquals(null, OrcRecordUpdater.getRow(event)); + assertNull(OrcRecordUpdater.getRow(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, @@ -693,7 +701,7 @@ public void testNewBaseAndDelta() throws Exception { assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 7, 200), id); - assertEquals(null, OrcRecordUpdater.getRow(event)); + assertNull(OrcRecordUpdater.getRow(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, @@ -705,8 +713,7 @@ public void testNewBaseAndDelta() throws Exception { assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 8, 200), id); - assertEquals(null, OrcRecordUpdater.getRow(event)); - + assertNull(OrcRecordUpdater.getRow(event)); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(event)); @@ -747,6 +754,7 @@ public void testNewBaseAndDelta() throws Exception { Text mytext; float myfloat; double mydouble; + RecordIdentifier ROW__ID; BigRow(int myint, long mylong, String mytext, float myfloat, double mydouble) { this.myint = myint; @@ -754,6 +762,21 @@ public void testNewBaseAndDelta() throws Exception { this.mytext = new Text(mytext); this.myfloat = myfloat; this.mydouble = mydouble; + ROW__ID = null; + } + + BigRow(int myint, long mylong, String mytext, float myfloat, double mydouble, + long rowId, long origTxn, int bucket) { + this.myint = myint; + this.mylong = mylong; + this.mytext = new Text(mytext); + this.myfloat = myfloat; + this.mydouble = mydouble; + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); + } + + BigRow(long rowId, long origTxn, int bucket) { + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); } } @@ -802,16 +825,16 @@ synchronized void addedRow() throws IOException { // write a delta AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .writingBase(false).minimumTransactionId(1).maximumTransactionId(1) - .bucket(BUCKET).inspector(inspector).filesystem(fs); + .bucket(BUCKET).inspector(inspector).filesystem(fs).recIdCol(5); RecordUpdater ru = of.getRecordUpdater(root, options); values = new String[]{"0.0", null, null, "1.1", null, null, null, "ignore.7"}; for(int i=0; i < values.length; ++i) { if (values[i] != null) { - ru.update(1, 0, i, new BigRow(i, i, values[i], i, i)); + ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); } } - ru.delete(100, 0, 9); + ru.delete(100, new BigRow(9, 0, BUCKET)); ru.close(false); // write a delta @@ -820,10 +843,10 @@ synchronized void addedRow() throws IOException { values = new String[]{null, null, "1.0", null, null, null, null, "3.1"}; for(int i=0; i < values.length; ++i) { if (values[i] != null) { - ru.update(2, 0, i, new BigRow(i, i, values[i], i, i)); + ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); } } - ru.delete(100, 0, 8); + ru.delete(100, new BigRow(8, 0, BUCKET)); ru.close(false); InputFormat inf = new OrcInputFormat(); @@ -902,16 +925,16 @@ synchronized void addedRow() throws IOException { ru.close(false); // write a delta - options.writingBase(false).minimumTransactionId(1).maximumTransactionId(1); + options.writingBase(false).minimumTransactionId(1).maximumTransactionId(1).recIdCol(5); ru = of.getRecordUpdater(root, options); values = new String[]{"0.0", null, null, "1.1", null, null, null, "ignore.7"}; for(int i=0; i < values.length; ++i) { if (values[i] != null) { - ru.update(1, 0, i, new BigRow(i, i, values[i], i, i)); + ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); } } - ru.delete(100, 0, 9); + ru.delete(100, new BigRow(9, 0, BUCKET)); ru.close(false); // write a delta @@ -920,10 +943,10 @@ synchronized void addedRow() throws IOException { values = new String[]{null, null, "1.0", null, null, null, null, "3.1"}; for(int i=0; i < values.length; ++i) { if (values[i] != null) { - ru.update(2, 0, i, new BigRow(i, i, values[i], i, i)); + ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); } } - ru.delete(100, 0, 8); + ru.delete(100, new BigRow(8, 0, BUCKET)); ru.close(false); InputFormat inf = new OrcInputFormat(); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java index db553f5..8cce392 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java @@ -23,8 +23,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; -import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.io.IntWritable; @@ -37,6 +37,7 @@ import java.io.File; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class TestOrcRecordUpdater { @@ -64,9 +65,18 @@ public void testAccessors() throws Exception { static class MyRow { Text field; + RecordIdentifier ROW__ID; + MyRow(String val) { field = new Text(val); + ROW__ID = null; + } + + MyRow(String val, long rowId, long origTxn, int bucket) { + field = new Text(val); + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); } + } @Test @@ -178,17 +188,19 @@ public void testUpdates() throws Exception { inspector = ObjectInspectorFactory.getReflectionObjectInspector (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } + int bucket = 20; AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .filesystem(fs) - .bucket(20) + .bucket(bucket) .writingBase(false) .minimumTransactionId(100) .maximumTransactionId(100) .inspector(inspector) - .reporter(Reporter.NULL); + .reporter(Reporter.NULL) + .recIdCol(1); RecordUpdater updater = new OrcRecordUpdater(root, options); - updater.update(100, 10, 30, new MyRow("update")); - updater.delete(100, 40, 60); + updater.update(100, new MyRow("update", 30, 10, bucket)); + updater.delete(100, new MyRow("", 60, 40, bucket)); assertEquals(-1L, updater.getStats().getRowCount()); updater.close(false); Path bucketPath = AcidUtils.createFilename(root, options); @@ -216,7 +228,7 @@ public void testUpdates() throws Exception { assertEquals(40, OrcRecordUpdater.getOriginalTransaction(row)); assertEquals(20, OrcRecordUpdater.getBucket(row)); assertEquals(60, OrcRecordUpdater.getRowId(row)); - assertEquals(null, OrcRecordUpdater.getRow(row)); + assertNull(OrcRecordUpdater.getRow(row)); assertEquals(false, rows.hasNext()); } }