diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java new file mode 100644 index 0000000..9e16da8 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +import java.io.IOException; + +public interface AcidInputFormat + extends InputFormat, InputFormatChecker { + + // TODO replace this type with Alan's class for transaction sets. + static class ValidTransactionSet{} + + public static class Options { + private final Configuration conf; + private Reporter reporter; + private ValidTransactionSet transactions; + + public Options(Configuration conf) { + this.conf = conf; + } + + public Options reporter(Reporter reporter) { + this.reporter = reporter; + return this; + } + + public Options validTransactions(ValidTransactionSet transactions) { + this.transactions = transactions; + return this; + } + + protected Configuration getConfiguration() { + return conf; + } + + protected Reporter getReporter() { + return reporter; + } + + protected ValidTransactionSet getValidTransactions() { + return transactions; + } + } + + RecordReader getReader(InputSplit split, + Options options + ) throws IOException; + + RecordReader + getVectorizedReader(InputSplit split, + Options options) throws IOException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java new file mode 100644 index 0000000..0094ef8 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.Reporter; + +import java.io.IOException; +import java.util.Properties; + +public interface AcidOutputFormat extends HiveOutputFormat { + + public static class Options { + private final Configuration configuration; + private TypeInfo typeInfo; + private boolean writingBase = false; + private boolean isCompressed = false; + private Properties properties; + private Reporter reporter; + private long minimumTransactionId; + private long maximumTransactionId; + private int bucket; + + public Options(Configuration conf) { + this.configuration = conf; + } + + public Options typeInfo(TypeInfo typeInfo) { + this.typeInfo = typeInfo; + return this; + } + + public Options writingBase(boolean val) { + this.writingBase = val; + return this; + } + + public Options isCompressed(boolean isCompressed) { + this.isCompressed = isCompressed; + return this; + } + + public Options tableProperties(Properties properties) { + this.properties = properties; + return this; + } + + public Options reporter(Reporter reporter) { + this.reporter = reporter; + return this; + } + + public Options minimumTransactionId(long min) { + this.minimumTransactionId = min; + return this; + } + + public Options maximumTransactionId(long max) { + this.maximumTransactionId = max; + return this; + } + + public Options bucket(int bucket) { + this.bucket = bucket; + return this; + } + + protected Configuration getConfiguration() { + return configuration; + } + + protected TypeInfo getRowType() { + return typeInfo; + } + + protected boolean isCompressed() { + return isCompressed; + } + + protected Properties getTableProperties() { + return properties; + } + + protected Reporter getReporter() { + return reporter; + } + + protected long getMinimumTransactionId() { + return minimumTransactionId; + } + + protected long getMaximumTransactionId() { + return maximumTransactionId; + } + + protected boolean isWritingBase() { + return writingBase; + } + + protected int getBucket() { + return bucket; + } + } + + /** + * Create a RecordUpdater for inserting, updating, or deleting records. + * @param path the partition directory name + * @param options the options for the writer + * @return the RecordUpdater for the output file + */ + RecordUpdater getRecordUpdater(Path path, + Options options) throws IOException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 647a9a6..9d8fb19 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -44,11 +44,8 @@ import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileInputFormat; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java new file mode 100644 index 0000000..8fb22de --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.WritableComparable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Gives the Record identifer information for the current record. + */ +public class RecordIdentifier implements WritableComparable { + private long transactionId; + private int bucketId; + private long rowId; + + /** + * Set the identifier. + * @param transactionId the transaction id + * @param bucketId the bucket id + * @param rowId the row id + */ + public void setValues(long transactionId, int bucketId, long rowId) { + this.transactionId = transactionId; + this.bucketId = bucketId; + this.rowId = rowId; + } + + /** + * What was the original transaction id for the last row? + * @return the transaction id + */ + public long getRowTransactionId() { + return transactionId; + } + + /** + * What was the original bucket id for the last row? + * @return the bucket id + */ + int getRowBucketId() { + return bucketId; + } + + /** + * What was the original row id for the last row? + * @return the row id + */ + long getRowId() { + return rowId; + } + + @Override + public int compareTo(RecordIdentifier recordIdentifier) { + if (transactionId != recordIdentifier.transactionId) { + return transactionId < recordIdentifier.transactionId ? -1 : 1; + } + if (bucketId != recordIdentifier.bucketId) { + return bucketId < recordIdentifier.bucketId ? - 1 : 1; + } + if (rowId != recordIdentifier.rowId) { + return rowId < recordIdentifier.rowId ? -1 : 1; + } + return 0; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + throw new UnsupportedOperationException("Can't write RecordIdentifier"); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + throw new UnsupportedOperationException("Can't read RecordIdentifier"); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java new file mode 100644 index 0000000..c45eb8c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +import java.io.IOException; + +/** + * API for supporting updating records. + */ +public interface RecordUpdater { + + /** + * Insert a new record into the table. + * @param currentTransaction the transaction id of the current transaction. + * @param bucket the bucket of the row + * @param row the row of data to insert + * @param inspector the object inspector for the row + * @throws IOException + */ + void insert(long currentTransaction, + int bucket, + Object row, + ObjectInspector inspector) throws IOException; + + /** + * Update an old record with a new set of values. + * @param currentTransaction the current transaction id + * @param originalTransaction the row's original transaction id + * @param originalBucket the row's original bucket id + * @param rowId the original row id + * @param row the new values for the row + * @param inspector the object inspector for the row + * @throws IOException + */ + void update(long currentTransaction, + long originalTransaction, + int originalBucket, + long rowId, + Object row, + ObjectInspector inspector) throws IOException; + + /** + * Delete a row from the table. + * @param currentTransaction the current transaction id + * @param originalTransaction the rows original transaction id + * @param originalBucket the row's original bucket id + * @param rowId the original row id + * @throws IOException + */ + void delete(long currentTransaction, + long originalTransaction, + int originalBucket, + long rowId) throws IOException; + + /** + * Flush the current set of rows to the underlying file system, so that + * they are available to readers. + * @throws IOException + */ + void flush() throws IOException; + + /** + * Close this updater. No further calls are legal after this. + * @param abort Can the data since the last flush be discarded? + * @throws IOException + */ + void close(boolean abort) throws IOException; + + /** + * Returns the statistics information + * @return SerDeStats + */ + SerDeStats getStats(); +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index a5747a6..d490787 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -42,7 +42,9 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.InputFormatChecker; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.orc.Metadata; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.FileGenerator; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitGenerator; @@ -74,9 +76,8 @@ * A MapReduce/Hive input format for ORC files. */ public class OrcInputFormat implements InputFormat, - InputFormatChecker, VectorizedInputFormatInterface { - - VectorizedOrcInputFormat voif = new VectorizedOrcInputFormat(); + InputFormatChecker, VectorizedInputFormatInterface, + AcidInputFormat { private static final Log LOG = LogFactory.getLog(OrcInputFormat.class); static final String MIN_SPLIT_SIZE = "mapred.min.split.size"; @@ -249,9 +250,9 @@ public static SearchArgument createSarg(List types, Configuration getRecordReader(InputSplit inputSplit, JobConf conf, Reporter reporter) throws IOException { if (isVectorMode(conf)) { - RecordReader vorr = voif.getRecordReader(inputSplit, conf, - reporter); - return (RecordReader) vorr; + return (RecordReader) + new VectorizedOrcInputFormat().getRecordReader(inputSplit, conf, + reporter); } FileSplit fSplit = (FileSplit)inputSplit; reporter.setStatus(fSplit.toString()); @@ -282,7 +283,7 @@ public boolean validateInput(FileSystem fs, HiveConf conf, ) throws IOException { if (isVectorMode(conf)) { - return voif.validateInput(fs, conf, files); + return new VectorizedOrcInputFormat().validateInput(fs, conf, files); } if (files.size() <= 0) { @@ -910,4 +911,22 @@ private Object getMin(ColumnStatistics index) { this.types = types; } } + + @Override + public RecordReader + getReader(AcidInputFormat.Context context, + InputSplit split) throws IOException { + // TODO + return null; + } + + @Override + public RecordReader + getVectorizedReader(AcidInputFormat.Context context, + InputSplit split) throws IOException { + // TODO + return null; + } + + } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index 62e7b34..3cb6796 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -17,14 +17,21 @@ */ package org.apache.hadoop.hive.ql.io.orc; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.FSRecordWriter; -import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileOutputFormat; @@ -41,7 +48,7 @@ * A Hive OutputFormat for ORC files. */ public class OrcOutputFormat extends FileOutputFormat - implements HiveOutputFormat { + implements AcidOutputFormat { private static class OrcRecordWriter implements RecordWriter, @@ -160,4 +167,122 @@ public SerDeStats getStats() { return new OrcRecordWriter(path, options); } + + private class OrcRecordUpdater implements RecordUpdater { + private final Options options; + private final Path path; + + private OrcRecordUpdater(Path path, Options options) { + this.options = options; + this.path = path; + } + + @Override + public void insert(long currentTransaction, int bucket, Object row, + ObjectInspector inspector) throws IOException { + System.out.println("insert " + path + " currTxn: " + currentTransaction + + " bucket: " + bucket + " obj: " + stringifyObject(row, inspector)); + } + + @Override + public void update(long currentTransaction, long originalTransaction, + int originalBucket, long rowId, Object row, + ObjectInspector inspector) throws IOException { + System.out.println("update " + path + " currTxn: " + currentTransaction + + " origTxn: " + originalTransaction + " bucket: " + originalBucket + + " row: " + rowId + " obj: " + stringifyObject(row, inspector)); + } + + @Override + public void delete(long currentTransaction, long originalTransaction, + int originalBucket, long rowId) throws IOException { + System.out.println("delete " + path + " currTxn: " + currentTransaction + + " origTxn: " + originalTransaction + " bucket: " + originalBucket + + " row: " + rowId); + } + + @Override + public void flush() throws IOException { + System.out.println("flush " + path); + } + + @Override + public void close(boolean abort) throws IOException { + System.out.println("close " + path); + } + + @Override + public SerDeStats getStats() { + return null; + } + + private void stringifyObject(StringBuilder buffer, + Object obj, + ObjectInspector inspector + ) throws IOException { + if (inspector instanceof StructObjectInspector) { + buffer.append("{ "); + StructObjectInspector soi = (StructObjectInspector) inspector; + boolean isFirst = true; + for(StructField field: soi.getAllStructFieldRefs()) { + if (isFirst) { + isFirst = false; + } else { + buffer.append(", "); + } + buffer.append(field.getFieldName()); + buffer.append(": "); + stringifyObject(buffer, soi.getStructFieldData(obj, field), + field.getFieldObjectInspector()); + } + buffer.append(" }"); + } else if (inspector instanceof PrimitiveObjectInspector) { + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) inspector; + buffer.append(poi.getPrimitiveJavaObject(obj).toString()); + } else { + buffer.append("*unknown*"); + } + } + + private String stringifyObject(Object obj, + ObjectInspector inspector + ) throws IOException { + StringBuilder buffer = new StringBuilder(); + stringifyObject(buffer, obj, inspector); + return buffer.toString(); + } + } + + @Override + public RecordUpdater getRecordUpdater(Path path, + Options options) throws IOException { + return new OrcRecordUpdater(path, options); + } + + private static class MyRow { + private String col1; + private int col2; + MyRow(String col1, int col2) { + this.col1 = col1; + this.col2 = col2; + } + } + + public static void main(String[] args) throws Exception { + OrcOutputFormat outf = new OrcOutputFormat(); + final Configuration conf = new Configuration(); + final ObjectInspector inspector = ObjectInspectorFactory.getReflectionObjectInspector(MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + final TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(inspector.getTypeName()); + RecordUpdater up = outf.getRecordUpdater(new Path("/hive/db/tbl/part"), + new AcidOutputFormat.Options(conf) + .typeInfo(typeInfo) + .bucket(0) + .minimumTransactionId(100) + .maximumTransactionId(200)); + up.insert(1,2, new MyRow("owen", 3), inspector); + up.update(1, 2, 3, 4, new MyRow("alan", 5), inspector); + up.delete(1, 2, 3, 4); + up.flush(); + up.close(false); + } }