diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java index 192d216..093f40b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java @@ -40,26 +40,17 @@ void insert(long currentTransaction, /** * Update an old record with a new set of values. * @param currentTransaction the current transaction id - * @param originalTransaction the row's original transaction id - * @param rowId the original row id * @param row the new values for the row * @throws IOException */ - void update(long currentTransaction, - long originalTransaction, - long rowId, - Object row) throws IOException; + void update(long currentTransaction, Object row) throws IOException; /** * Delete a row from the table. * @param currentTransaction the current transaction id - * @param originalTransaction the rows original transaction id - * @param rowId the original row id * @throws IOException */ - void delete(long currentTransaction, - long originalTransaction, - long rowId) throws IOException; + void delete(long currentTransaction, Object row) throws IOException; /** * Flush the current set of rows to the underlying file system, so that diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index 2749c7f..5bd3f0c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -211,18 +211,14 @@ public void insert(long currentTransaction, Object row) throws IOException { } @Override - public void update(long currentTransaction, long originalTransaction, - long rowId, Object row) throws IOException { + public void update(long currentTransaction, Object row) throws IOException { out.println("update " + path + " currTxn: " + currentTransaction + - " origTxn: " + originalTransaction + " row: " + rowId + " obj: " + - stringifyObject(row, inspector)); + " obj: " + stringifyObject(row, inspector)); } @Override - public void delete(long currentTransaction, long originalTransaction, - long rowId) throws IOException { - out.println("delete " + path + " currTxn: " + currentTransaction + - " origTxn: " + originalTransaction + " row: " + rowId); + public void delete(long currentTransaction, Object row) throws IOException { + out.println("delete " + path + " currTxn: " + currentTransaction + " obj: " + row); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 8f17c12..e3486aa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -28,21 +28,23 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaLongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; -import java.nio.charset.CharsetEncoder; import java.util.ArrayList; import java.util.List; @@ -89,6 +91,11 @@ private final LongWritable rowId = new LongWritable(); private long insertedRows = 0; private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder(); + private StructField rowIdField = null; + private StructField numericRowIdField = null; + private StructField originalTxnField = null; + private StructObjectInspector rowInspector; + private StructObjectInspector rowIdInspector; static class AcidStats { long inserts; @@ -176,7 +183,7 @@ public OrcOptions orcOptions(OrcFile.WriterOptions opts) { * @param rowInspector the row's object inspector * @return an object inspector for the event stream */ - static ObjectInspector createEventSchema(ObjectInspector rowInspector) { + static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { List fields = new ArrayList(); fields.add(new OrcStruct.Field("operation", PrimitiveObjectInspectorFactory.writableIntObjectInspector, OPERATION)); @@ -234,7 +241,8 @@ static ObjectInspector createEventSchema(ObjectInspector rowInspector) { writerOptions.bufferSize(DELTA_BUFFER_SIZE); writerOptions.stripeSize(DELTA_STRIPE_SIZE); } - writerOptions.inspector(createEventSchema(options.getInspector())); + rowInspector = (StructObjectInspector)options.getInspector(); + writerOptions.inspector(createEventSchema(findRowId(options.getInspector()))); this.writer = OrcFile.createWriter(this.path, writerOptions); item = new OrcStruct(FIELDS); item.setFieldValue(OPERATION, operation); @@ -244,13 +252,50 @@ static ObjectInspector createEventSchema(ObjectInspector rowInspector) { item.setFieldValue(ROW_ID, rowId); } - private void addEvent(int operation, long currentTransaction, - long originalTransaction, long rowId, - Object row) throws IOException { + private ObjectInspector findRowId(ObjectInspector inspector) { + if (!(inspector instanceof StructObjectInspector)) { + throw new RuntimeException("Serious problem, expected a StructObjectInspector, but got a " + + inspector.getClass().getName()); + } + RowIdStrippingObjectInspector newInspector = new RowIdStrippingObjectInspector(inspector); + rowIdField = newInspector.getRowId(); + if (rowIdField == null) { + return inspector; + } else { + List fields = + ((StructObjectInspector) rowIdField.getFieldObjectInspector()).getAllStructFieldRefs(); + for (StructField field : fields) { + LOG.debug("Found columnname " + field.getFieldName()); + if (RecordIdentifier.Field.transactionId.name().equalsIgnoreCase(field.getFieldName())) { + originalTxnField = field; + } else if (RecordIdentifier.Field.rowId.name().equalsIgnoreCase(field.getFieldName())) { + numericRowIdField = field; + } + } + if (originalTxnField == null || numericRowIdField == null) { + throw new RuntimeException("Serious problem: Found rowId struct but could not find rowId" + + " or originalTxn field!"); + } + rowIdInspector = (StructObjectInspector)rowIdField.getFieldObjectInspector(); + return newInspector; + } + } + + private void addEvent(int operation, long currentTransaction, long rowId, + boolean readRowIdFromRow, Object row) + throws IOException { this.operation.set(operation); this.currentTransaction.set(currentTransaction); - this.originalTransaction.set(originalTransaction); + long originalTransaction = currentTransaction; + if (readRowIdFromRow) { + Object rowIdValue = rowInspector.getStructFieldData(row, rowIdField); + originalTransaction = PrimitiveObjectInspectorFactory.javaLongObjectInspector.get( + rowIdInspector.getStructFieldData(rowIdValue, originalTxnField)); + rowId = PrimitiveObjectInspectorFactory.javaLongObjectInspector.get( + rowIdInspector.getStructFieldData(rowIdValue, numericRowIdField)); + } this.rowId.set(rowId); + this.originalTransaction.set(originalTransaction); item.setFieldValue(OrcRecordUpdater.ROW, row); indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId); writer.addRow(item); @@ -261,28 +306,23 @@ public void insert(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; } - addEvent(INSERT_OPERATION, currentTransaction, currentTransaction, - insertedRows++, row); + addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, false, row); } @Override - public void update(long currentTransaction, long originalTransaction, - long rowId, Object row) throws IOException { + public void update(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; } - addEvent(UPDATE_OPERATION, currentTransaction, originalTransaction, rowId, - row); + addEvent(UPDATE_OPERATION, currentTransaction, -1L, true, row); } @Override - public void delete(long currentTransaction, long originalTransaction, - long rowId) throws IOException { + public void delete(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { insertedRows = 0; } - addEvent(DELETE_OPERATION, currentTransaction, originalTransaction, rowId, - null); + addEvent(DELETE_OPERATION, currentTransaction, -1, true, row); } @Override @@ -397,4 +437,62 @@ void addKey(int op, long transaction, int bucket, long rowId) { lastRowId = rowId; } } + + private static class RowIdStrippingObjectInspector extends StructObjectInspector { + private StructObjectInspector wrapped; + List fields; + StructField rowId; + + RowIdStrippingObjectInspector(ObjectInspector oi) { + if (!(oi instanceof StructObjectInspector)) { + throw new RuntimeException("Serious problem, expected a StructObjectInspector, " + + "but got a " + oi.getClass().getName()); + } + wrapped = (StructObjectInspector)oi; + fields = new ArrayList(wrapped.getAllStructFieldRefs().size()); + for (StructField field : wrapped.getAllStructFieldRefs()) { + if (VirtualColumn.ROWID.getName().equalsIgnoreCase(field.getFieldName())) { + rowId = field; + } else { + fields.add(field); + } + } + } + + @Override + public List getAllStructFieldRefs() { + return fields; + } + + @Override + public StructField getStructFieldRef(String fieldName) { + return wrapped.getStructFieldRef(fieldName); + } + + @Override + public Object getStructFieldData(Object data, StructField fieldRef) { + // For performance don't check that that the fieldRef isn't rowId everytime, + // just assume that the caller used getAllStructFieldRefs and thus doesn't have that fieldRef + return wrapped.getStructFieldData(data, fieldRef); + } + + @Override + public List getStructFieldsDataAsList(Object data) { + return wrapped.getStructFieldsDataAsList(data); + } + + @Override + public String getTypeName() { + return wrapped.getTypeName(); + } + + @Override + public Category getCategory() { + return wrapped.getCategory(); + } + + StructField getRowId() { + return rowId; + } + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index b4ce4a0..ab629dc 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -454,9 +454,16 @@ public void testNewBase() throws Exception { static class MyRow { Text col1; + RecordIdentifier ROW__ID; + MyRow(String val) { col1 = new Text(val); } + + MyRow(String val, long rowId, long origTxn, int bucket) { + col1 = new Text(val); + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); + } } static String getValue(OrcStruct event) { @@ -534,11 +541,11 @@ public void testNewBaseAndDelta() throws Exception { // write a delta ru = of.getRecordUpdater(root, options.writingBase(false) .minimumTransactionId(200).maximumTransactionId(200)); - ru.update(200, 0, 0, new MyRow("update 1")); - ru.update(200, 0, 2, new MyRow("update 2")); - ru.update(200, 0, 3, new MyRow("update 3")); - ru.delete(200, 0, 7); - ru.delete(200, 0, 8); + ru.update(200, new MyRow("update 1", 0, 0, BUCKET)); + ru.update(200, new MyRow("update 2", 2, 0, BUCKET)); + ru.update(200, new MyRow("update 3", 3, 0, BUCKET)); + ru.delete(200, new MyRow("", 7, 0, BUCKET)); + ru.delete(200, new MyRow("", 8, 0, BUCKET)); ru.close(false); ValidTxnList txnList = new ValidTxnListImpl("200:"); @@ -607,13 +614,13 @@ public void testNewBaseAndDelta() throws Exception { assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 7, 200), id); - assertEquals(null, OrcRecordUpdater.getRow(event)); + assertEquals(1, OrcRecordUpdater.getRow(event).getNumFields()); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 8, 200), id); - assertEquals(null, OrcRecordUpdater.getRow(event)); + assertEquals(1, OrcRecordUpdater.getRow(event).getNumFields()); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, @@ -693,7 +700,7 @@ public void testNewBaseAndDelta() throws Exception { assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 7, 200), id); - assertEquals(null, OrcRecordUpdater.getRow(event)); + assertEquals(1, OrcRecordUpdater.getRow(event).getNumFields()); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, @@ -705,7 +712,7 @@ public void testNewBaseAndDelta() throws Exception { assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(event)); assertEquals(new ReaderKey(0, BUCKET, 8, 200), id); - assertEquals(null, OrcRecordUpdater.getRow(event)); + assertEquals(1, OrcRecordUpdater.getRow(event).getNumFields()); assertEquals(true, merger.next(id, event)); assertEquals(OrcRecordUpdater.INSERT_OPERATION, @@ -747,6 +754,7 @@ public void testNewBaseAndDelta() throws Exception { Text mytext; float myfloat; double mydouble; + RecordIdentifier ROW__ID; BigRow(int myint, long mylong, String mytext, float myfloat, double mydouble) { this.myint = myint; @@ -754,6 +762,21 @@ public void testNewBaseAndDelta() throws Exception { this.mytext = new Text(mytext); this.myfloat = myfloat; this.mydouble = mydouble; + ROW__ID = null; + } + + BigRow(int myint, long mylong, String mytext, float myfloat, double mydouble, + long rowId, long origTxn, int bucket) { + this.myint = myint; + this.mylong = mylong; + this.mytext = new Text(mytext); + this.myfloat = myfloat; + this.mydouble = mydouble; + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); + } + + BigRow(long rowId, long origTxn, int bucket) { + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); } } @@ -808,10 +831,10 @@ synchronized void addedRow() throws IOException { "ignore.7"}; for(int i=0; i < values.length; ++i) { if (values[i] != null) { - ru.update(1, 0, i, new BigRow(i, i, values[i], i, i)); + ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); } } - ru.delete(100, 0, 9); + ru.delete(100, new BigRow(9, 0, BUCKET)); ru.close(false); // write a delta @@ -820,10 +843,10 @@ synchronized void addedRow() throws IOException { values = new String[]{null, null, "1.0", null, null, null, null, "3.1"}; for(int i=0; i < values.length; ++i) { if (values[i] != null) { - ru.update(2, 0, i, new BigRow(i, i, values[i], i, i)); + ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); } } - ru.delete(100, 0, 8); + ru.delete(100, new BigRow(8, 0, BUCKET)); ru.close(false); InputFormat inf = new OrcInputFormat(); @@ -908,10 +931,10 @@ synchronized void addedRow() throws IOException { "ignore.7"}; for(int i=0; i < values.length; ++i) { if (values[i] != null) { - ru.update(1, 0, i, new BigRow(i, i, values[i], i, i)); + ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); } } - ru.delete(100, 0, 9); + ru.delete(100, new BigRow(9, 0, BUCKET)); ru.close(false); // write a delta @@ -920,10 +943,10 @@ synchronized void addedRow() throws IOException { values = new String[]{null, null, "1.0", null, null, null, null, "3.1"}; for(int i=0; i < values.length; ++i) { if (values[i] != null) { - ru.update(2, 0, i, new BigRow(i, i, values[i], i, i)); + ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); } } - ru.delete(100, 0, 8); + ru.delete(100, new BigRow(8, 0, BUCKET)); ru.close(false); InputFormat inf = new OrcInputFormat(); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java index b53bd85..761a5ce 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; @@ -63,9 +64,18 @@ public void testAccessors() throws Exception { static class MyRow { Text field; + RecordIdentifier ROW__ID; + MyRow(String val) { field = new Text(val); + ROW__ID = null; } + + MyRow(String val, long rowId, long origTxn, int bucket) { + field = new Text(val); + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); + } + } @Test @@ -171,17 +181,18 @@ public void testUpdates() throws Exception { inspector = ObjectInspectorFactory.getReflectionObjectInspector (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } + int bucket = 20; AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .filesystem(fs) - .bucket(20) + .bucket(bucket) .writingBase(false) .minimumTransactionId(100) .maximumTransactionId(100) .inspector(inspector) .reporter(Reporter.NULL); RecordUpdater updater = new OrcRecordUpdater(root, options); - updater.update(100, 10, 30, new MyRow("update")); - updater.delete(100, 40, 60); + updater.update(100, new MyRow("update", 30, 10, bucket)); + updater.delete(100, new MyRow("", 60, 40, bucket)); updater.close(false); Path bucketPath = AcidUtils.createFilename(root, options); @@ -208,7 +219,7 @@ public void testUpdates() throws Exception { assertEquals(40, OrcRecordUpdater.getOriginalTransaction(row)); assertEquals(20, OrcRecordUpdater.getBucket(row)); assertEquals(60, OrcRecordUpdater.getRowId(row)); - assertEquals(null, OrcRecordUpdater.getRow(row)); + assertEquals(1, OrcRecordUpdater.getRow(row).getNumFields()); assertEquals(false, rows.hasNext()); } }