diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java index c6fb26c..ae90d57 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java @@ -23,12 +23,17 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.parquet.acid.ParquetRecordUpdater; import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport; import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper; import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.shims.ShimLoader; @@ -40,6 +45,7 @@ import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.StringUtils; import parquet.hadoop.ParquetOutputFormat; /** @@ -48,10 +54,12 @@ * */ public class MapredParquetOutputFormat extends FileOutputFormat - implements HiveOutputFormat { + implements AcidOutputFormat { private static final Log LOG = LogFactory.getLog(MapredParquetOutputFormat.class); + final static String ENABLE_ACID_SCHEMA_INFO = "hive.acid.parquet.enable"; + protected ParquetOutputFormat realOutputFormat; public MapredParquetOutputFormat() { @@ -77,6 +85,42 @@ public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws throw new RuntimeException("Should never be used"); } + public void createEventSchema(final JobConf jobConf, final Properties tableProperties, + boolean isAcid) { + List eventColumnNames = getColNameList(tableProperties, isAcid); + List eventColumnTypeProperty = getTypeInfoList(tableProperties, isAcid); + DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(eventColumnNames, + eventColumnTypeProperty), jobConf); + } + + private List getColNameList(final Properties tableProperties, boolean isAcidSchema) { + if (isAcidSchema) { + return Arrays.asList(ParquetRecordUpdater.ACID_EVENT_ITEM_NAMES); + } else { + return (List) StringUtils.getStringCollection( + tableProperties.getProperty(IOConstants.COLUMNS)); + } + } + + private List getTypeInfoList(final Properties tableProperties, boolean isAcidSchema) { + ArrayList columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(tableProperties + .getProperty(IOConstants.COLUMNS_TYPES)); + + if (isAcidSchema) { + ArrayList columnNames = new ArrayList<>( + StringUtils.getStringCollection(tableProperties.getProperty(IOConstants.COLUMNS))); + StructTypeInfo rowType = new StructTypeInfo(); + rowType.setAllStructFieldNames(columnNames); + rowType.setAllStructFieldTypeInfos(columnTypes); + List eventColumnTypeProperty = TypeInfoUtils.getTypeInfosFromTypeString( + ParquetRecordUpdater.ACID_EVENT_TYPEINFO_STR); + eventColumnTypeProperty.add(rowType); + return eventColumnTypeProperty; + } else { + return columnTypes; + } + } + /** * * Create the parquet schema from the hive schema, and return the RecordWriterWrapper which @@ -90,30 +134,13 @@ public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws final boolean isCompressed, final Properties tableProperties, final Progressable progress) throws IOException { - LOG.info("creating new record writer..." + this); - final String columnNameProperty = tableProperties.getProperty(IOConstants.COLUMNS); - final String columnTypeProperty = tableProperties.getProperty(IOConstants.COLUMNS_TYPES); - List columnNames; - List columnTypes; - - if (columnNameProperty.length() == 0) { - columnNames = new ArrayList(); - } else { - columnNames = Arrays.asList(columnNameProperty.split(",")); - } - - if (columnTypeProperty.length() == 0) { - columnTypes = new ArrayList(); - } else { - columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); - } - - DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf); + boolean isAcid = Boolean.valueOf(tableProperties.getProperty(ENABLE_ACID_SCHEMA_INFO, "false")); + createEventSchema(jobConf, tableProperties, isAcid); return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), - progress,tableProperties); + progress, tableProperties); } protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper( @@ -126,4 +153,27 @@ protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper( return new ParquetRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress,tableProperties); } + + @Override + public RecordUpdater getRecordUpdater(Path path, final Options options) throws IOException { + Properties tableProperties = new Properties(); + tableProperties.putAll(options.getTableProperties()); + // If we are writing the base file, we don't need the acid info. + tableProperties.setProperty(ENABLE_ACID_SCHEMA_INFO, Boolean.toString(!options.isWritingBase())); + + Path p = AcidUtils.createFilename(path, options); + FileSinkOperator.RecordWriter realWriter = getHiveRecordWriter((JobConf) options + .getConfiguration(), p, null, options.isCompressed(), tableProperties, null); + return new ParquetRecordUpdater(realWriter, options); + } + + /** + * Used by MR compactor + */ + @Override + public FileSinkOperator.RecordWriter getRawRecordWriter(Path path, Options options) throws + IOException { + //TODO + return null; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/acid/ParquetRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/acid/ParquetRecordUpdater.java new file mode 100644 index 0000000..53b48fd --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/acid/ParquetRecordUpdater.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.acid; + +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.parquet.serde.ObjectArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class ParquetRecordUpdater implements RecordUpdater { + final static int INSERT_OPERATION = 0; + final static int UPDATE_OPERATION = 1; + final static int DELETE_OPERATION = 2; + + final static int OPERATION = 0;// int + final static int ORIGINAL_TRANSACTION = 1;// long + final static int BUCKET = 2;// int + final static int ROW_ID = 3;// long + final static int CURRENT_TRANSACTION = 4;// long + final static int ROW = 5; + final static int FIELDS = 6; // int + + public final static String[] ACID_EVENT_ITEM_NAMES = new String[]{"OPERATION", + "ORIGINAL_TRANSACTION", "BUCKET", "ROW_ID", "CURRENT_TRANSACTION", "ROW"}; + public final static String ACID_EVENT_TYPEINFO_STR = "int:bigint:int:bigint:bigint"; + + ObjectArrayWritableObjectInspector rowInspector; + FileSinkOperator.RecordWriter writer; + + private int insertedRows = 0; + + int operationVal = 1; + long originalTxnIDVal = 1; + int bucketVal = 1; + long rowIDVal = 1; + long currentTxnIDVal = 1; + + Object[] item = new Object[FIELDS]; + + public void initialRowProps(AcidOutputFormat.Options options) { + List names = new ArrayList<>(); + names.add(OPERATION, ACID_EVENT_ITEM_NAMES[OPERATION]); + names.add(ORIGINAL_TRANSACTION, ACID_EVENT_ITEM_NAMES[ORIGINAL_TRANSACTION]); + names.add(BUCKET, ACID_EVENT_ITEM_NAMES[BUCKET]); + names.add(ROW_ID, ACID_EVENT_ITEM_NAMES[ROW_ID]); + names.add(CURRENT_TRANSACTION, ACID_EVENT_ITEM_NAMES[CURRENT_TRANSACTION]); + List typeInfos = new ArrayList<>(); + typeInfos.add(OPERATION, TypeInfoFactory.intTypeInfo); + typeInfos.add(ORIGINAL_TRANSACTION, TypeInfoFactory.longTypeInfo); + typeInfos.add(BUCKET, TypeInfoFactory.intTypeInfo); + typeInfos.add(ROW_ID, TypeInfoFactory.longTypeInfo); + typeInfos.add(CURRENT_TRANSACTION, TypeInfoFactory.longTypeInfo); + + StructTypeInfo rIdStructTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(names, + typeInfos); + + ObjectArrayWritableObjectInspector eventInspector = new ObjectArrayWritableObjectInspector( + rIdStructTypeInfo); + List fs = eventInspector.getFields(); + fs.add(eventInspector.buildStructField(ACID_EVENT_ITEM_NAMES[ROW], options.getInspector(), + fs.size())); + rowInspector = new ObjectArrayWritableObjectInspector(fs); + bucketVal = options.getBucket(); + item[OPERATION] = operationVal; + item[ORIGINAL_TRANSACTION] = originalTxnIDVal; + item[BUCKET] = bucketVal; + item[ROW_ID] = rowIDVal; + } + + public ObjectArrayWritable getRow(ObjectArrayWritable writable) { + return (ObjectArrayWritable) rowInspector.getStructFieldData(writable, + rowInspector.getStructFieldRef(ACID_EVENT_ITEM_NAMES[ROW])); + } + + public Integer getBucket(ObjectArrayWritable writable) { + return (Integer) rowInspector.getStructFieldData(writable, + rowInspector.getStructFieldRef(ACID_EVENT_ITEM_NAMES[BUCKET])); + } + + public ParquetRecordUpdater(FileSinkOperator.RecordWriter realWriter, + AcidOutputFormat.Options options) throws IOException { + initialRowProps(options); + this.writer = realWriter; + } + + @Override + public void insert(long currentTransaction, Object row) throws IOException { + if (currentTransaction != currentTxnIDVal) { + insertedRows = 0; + } + operationVal = INSERT_OPERATION; + currentTxnIDVal = currentTransaction; + originalTxnIDVal = currentTransaction; + rowIDVal = insertedRows++; + item[ROW] = row; + ObjectArrayWritable writable = new ObjectArrayWritable(item); + ParquetHiveRecord record = new ParquetHiveRecord(writable, rowInspector); + writer.write(record); + } + + @Override + public void update(long currentTransaction, Object row) throws IOException { + // TODO + } + + @Override + public void delete(long currentTransaction, Object row) throws IOException { + // TODO + } + + @Override + public void flush() throws IOException { + // TODO find one way to flush the record + } + + @Override + public void close(boolean abort) throws IOException { + writer.close(abort); + } + + @Override + public SerDeStats getStats() { + return null; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java index f513572..e3a38b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java @@ -15,12 +15,7 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -55,9 +50,9 @@ import parquet.hadoop.metadata.FileMetaData; import parquet.hadoop.metadata.ParquetMetadata; import parquet.hadoop.util.ContextUtil; -import parquet.schema.MessageTypeParser; import com.google.common.base.Strings; +import parquet.schema.MessageTypeParser; public class ParquetRecordReaderWrapper implements RecordReader { public static final Log LOG = LogFactory.getLog(ParquetRecordReaderWrapper.class); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ObjectArrayWritableObjectInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ObjectArrayWritableObjectInspector.java index 571f993..73ed620 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ObjectArrayWritableObjectInspector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ObjectArrayWritableObjectInspector.java @@ -42,19 +42,15 @@ */ public class ObjectArrayWritableObjectInspector extends SettableStructObjectInspector { - private final TypeInfo typeInfo; - private final List fieldInfos; private final List fieldNames; private final List fields; - private final HashMap fieldsByName; + private final HashMap fieldsByName; public ObjectArrayWritableObjectInspector(final StructTypeInfo rowTypeInfo) { - - typeInfo = rowTypeInfo; fieldNames = rowTypeInfo.getAllStructFieldNames(); - fieldInfos = rowTypeInfo.getAllStructFieldTypeInfos(); - fields = new ArrayList(fieldNames.size()); - fieldsByName = new HashMap(); + List fieldInfos = rowTypeInfo.getAllStructFieldTypeInfos(); + fields = new ArrayList<>(fieldNames.size()); + fieldsByName = new HashMap<>(); for (int i = 0; i < fieldNames.size(); ++i) { final String name = fieldNames.get(i); @@ -66,6 +62,16 @@ public ObjectArrayWritableObjectInspector(final StructTypeInfo rowTypeInfo) { } } + public ObjectArrayWritableObjectInspector(List fields) { + fieldNames = new ArrayList<>(); + this.fields = fields; + fieldsByName = new HashMap<>(); + for (StructField field : fields) { + fieldsByName.put(field.getFieldName(), field); + fieldNames.add(field.getFieldName()); + } + } + private ObjectInspector getObjectInspector(final TypeInfo typeInfo) { if (typeInfo.equals(TypeInfoFactory.doubleTypeInfo)) { return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector; @@ -112,7 +118,14 @@ private ObjectInspector getObjectInspector(final TypeInfo typeInfo) { } else { throw new UnsupportedOperationException("Unknown field type: " + typeInfo); } + } + public List getFields(){ + return fields; + } + + public StructField buildStructField(String fieldName, ObjectInspector inspector, int index) { + return new StructFieldImpl(fieldName, inspector, index); } @Override @@ -122,7 +135,19 @@ public Category getCategory() { @Override public String getTypeName() { - return typeInfo.getTypeName(); + StringBuilder buffer = new StringBuilder(); + buffer.append("struct<"); + for (int i = 0; i < fields.size(); ++i) { + StructField field = fields.get(i); + if (i != 0) { + buffer.append(","); + } + buffer.append(field.getFieldName()); + buffer.append(":"); + buffer.append(field.getFieldObjectInspector().getTypeName()); + } + buffer.append(">"); + return buffer.toString(); } @Override @@ -188,25 +213,37 @@ public Object setStructFieldData(Object struct, StructField field, Object fieldV } @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - final ObjectArrayWritableObjectInspector other = (ObjectArrayWritableObjectInspector) obj; - if (this.typeInfo != other.typeInfo && (this.typeInfo == null || !this.typeInfo.equals(other.typeInfo))) { + public boolean equals(Object o) { + if (o == null || o.getClass() != getClass()) { return false; + } else if (o == this) { + return true; + } else { + List other = ((ObjectArrayWritableObjectInspector) o).fields; + if (other.size() != fields.size()) { + return false; + } + for (int i = 0; i < fields.size(); ++i) { + StructField left = other.get(i); + StructField right = fields.get(i); + if (!(left.getFieldName().equalsIgnoreCase(right.getFieldName()) && left + .getFieldObjectInspector().equals(right.getFieldObjectInspector()))) { + return false; + } + } + return true; } - return true; } @Override public int hashCode() { - int hash = 5; - hash = 29 * hash + (this.typeInfo != null ? this.typeInfo.hashCode() : 0); - return hash; + int result = fields.size(); + for (Object field : fields) { + if (field != null) { + result ^= field.hashCode(); + } + } + return result; } class StructFieldImpl implements StructField { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/acid/TestParquetRecordUpdater.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/acid/TestParquetRecordUpdater.java new file mode 100644 index 0000000..a7ac17d --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/acid/TestParquetRecordUpdater.java @@ -0,0 +1,152 @@ +package org.apache.hadoop.hive.ql.io.parquet.acid; + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; +import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.mapred.RecordReader; +import org.junit.Test; +import parquet.schema.MessageType; +import parquet.schema.MessageTypeParser; + +public class TestParquetRecordUpdater { + private static final Path tmpDir = new Path(System + .getProperty("test.tmp.dir", "target" + File.separator + "test" + File.separator + "tmp")); + + final static List values = Arrays + .asList("first", "second", "third", "fourth", "fifth", "sixth", "seventh", "eighth", "ninth", + "tenth"); + + static class MyRow { + Text col1; + Text col2; + + MyRow(String val1, String val2) { + col1 = new Text(val1); + col2 = new Text(val2); + } + } + + @Test public void testCreateEventSchema() { + MapredParquetOutputFormat outputFormat = new MapredParquetOutputFormat(); + JobConf jobConf = new JobConf(); + Properties tableProperties = new Properties(); + String columnTypeProperty = "string:string"; + String columnNameProperty = "a,b"; + tableProperties.setProperty(IOConstants.COLUMNS, columnNameProperty); + tableProperties.setProperty(IOConstants.COLUMNS_TYPES, columnTypeProperty); + outputFormat.createEventSchema(jobConf, tableProperties, true); + String eventSchema = jobConf.get(DataWritableWriteSupport.PARQUET_HIVE_SCHEMA); + MessageType realMsgType = MessageTypeParser.parseMessageType(eventSchema); + String s = "message hive_schema {\n" + + " optional int32 OPERATION;\n" + + " optional int64 ORIGINAL_TRANSACTION;\n" + + " optional int32 BUCKET;\n" + + " optional int64 ROW_ID;\n" + + " optional int64 CURRENT_TRANSACTION;\n" + + " optional group ROW {\n" + + " optional binary a (UTF8);\n" + + " optional binary b (UTF8);\n" + + " }\n" + + "}"; + MessageType expectedType = MessageTypeParser.parseMessageType(s); + Assert.assertEquals(expectedType, realMsgType); + } + + @Test public void testInsert() throws IOException { + final int BUCKET = 10; + Configuration conf = new JobConf(); + Properties tblProps = new Properties(); + tblProps.setProperty(IOConstants.COLUMNS, "val1,val2"); + tblProps.setProperty(IOConstants.COLUMNS_TYPES, "string:string"); + FileSystem fs = FileSystem.getLocal(conf); + Map rus = new HashMap<>(); + + try { + MapredParquetOutputFormat outputFormat = new MapredParquetOutputFormat(); + + // initial inspector + ObjectInspector inspector = ObjectInspectorFactory.getReflectionObjectInspector(MyRow.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + + // write the base + final AcidOutputFormat.Options options = + new AcidOutputFormat.Options(conf).inspector(inspector).bucket(BUCKET) + .tableProperties(tblProps).writingBase(false); + + Path root = new Path(tmpDir, "testNewBaseAndDelta").makeQualified(fs); + fs.delete(root, true); + + String inputPath = ""; + int transactions = 10; + for (int i = 0; i < transactions; i++) { + options.minimumTransactionId(i); + options.maximumTransactionId(i); + Path p = AcidUtils.createFilename(root, options); + if (i != (transactions - 1)) { + inputPath += p.toUri().getPath() + ","; + } else { + inputPath += p.toUri().getPath(); + } + fs.delete(p, true); + RecordUpdater ru = outputFormat.getRecordUpdater(root, options); + for (int j = 0; j < values.size(); j++) { + ru.insert(i, new MyRow(values.get(j), values.get((j + 1) % values.size()))); + } + ru.close(false); + rus.put(p.getName(), ru); + } + + JobConf jobConf = new JobConf(conf); + jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, inputPath); + MapredParquetInputFormat inputFormat = new MapredParquetInputFormat(); + InputSplit[] splits = inputFormat.getSplits(jobConf, transactions); + + for (InputSplit s : splits) { + RecordReader recordReader = inputFormat.getRecordReader(s, jobConf, null); + ObjectArrayWritable value = new ObjectArrayWritable(new Object[6]); + boolean hasMore; + int i = 0; + do { + hasMore = recordReader.next(null, value); + RecordUpdater ru = rus.get(((FileSplit) s).getPath().getName()); + ObjectArrayWritable myRow = ((ParquetRecordUpdater) ru).getRow(value); + if (myRow != null && myRow.get() != null) { + int bucketID = ((ParquetRecordUpdater) ru).getBucket(value); + Assert.assertEquals(bucketID, BUCKET); + Text col1 = (Text) myRow.get()[0]; + Assert.assertEquals(values.get((values.indexOf((col1.toString())) + 1) % values.size()), + myRow.get()[1].toString()); + } + } while (hasMore); + } + } finally { + for (String p : rus.keySet()) { + fs.delete(new Path(p), true); + } + } + } +} diff --git a/ql/src/test/queries/clientpositive/acid_parquet_insert.q b/ql/src/test/queries/clientpositive/acid_parquet_insert.q new file mode 100644 index 0000000..92a76d5 --- /dev/null +++ b/ql/src/test/queries/clientpositive/acid_parquet_insert.q @@ -0,0 +1,10 @@ +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.enforce.bucketing=true; +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.compactor.initiator.on=true; + +CREATE TABLE numbers_bucketed (a int, b int) CLUSTERED BY (a) INTO 2 BUCKETS STORED AS Parquet TBLPROPERTIES("transactional"="true"); + +INSERT INTO TABLE numbers_bucketed VALUES(1, 100),(2, 200),(3, 300); +INSERT INTO TABLE numbers_bucketed VALUES(10, 1000); \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/acid_parquet_insert.q.out b/ql/src/test/results/clientpositive/acid_parquet_insert.q.out new file mode 100644 index 0000000..1445df8 --- /dev/null +++ b/ql/src/test/results/clientpositive/acid_parquet_insert.q.out @@ -0,0 +1,28 @@ +PREHOOK: query: CREATE TABLE numbers_bucketed (a int, b int) CLUSTERED BY (a) INTO 2 BUCKETS STORED AS Parquet TBLPROPERTIES("transactional"="true") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@numbers_bucketed +POSTHOOK: query: CREATE TABLE numbers_bucketed (a int, b int) CLUSTERED BY (a) INTO 2 BUCKETS STORED AS Parquet TBLPROPERTIES("transactional"="true") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@numbers_bucketed +PREHOOK: query: INSERT INTO TABLE numbers_bucketed VALUES(1, 100),(2, 200),(3, 300) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@numbers_bucketed +POSTHOOK: query: INSERT INTO TABLE numbers_bucketed VALUES(1, 100),(2, 200),(3, 300) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@numbers_bucketed +POSTHOOK: Lineage: numbers_bucketed.a EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: numbers_bucketed.b EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: INSERT INTO TABLE numbers_bucketed VALUES(10, 1000) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__2 +PREHOOK: Output: default@numbers_bucketed +POSTHOOK: query: INSERT INTO TABLE numbers_bucketed VALUES(10, 1000) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__2 +POSTHOOK: Output: default@numbers_bucketed +POSTHOOK: Lineage: numbers_bucketed.a EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: numbers_bucketed.b EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ]