diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java index c6fb26c..8e7924e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java @@ -23,12 +23,16 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.parquet.acid.ParquetRecordUpdater; import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport; import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper; import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.shims.ShimLoader; @@ -48,10 +52,12 @@ * */ public class MapredParquetOutputFormat extends FileOutputFormat - implements HiveOutputFormat { + implements AcidOutputFormat { private static final Log LOG = LogFactory.getLog(MapredParquetOutputFormat.class); + final static String ENABLEACIDSCHEMAINFO = "hive.acid.parquet.enable"; + protected ParquetOutputFormat realOutputFormat; public MapredParquetOutputFormat() { @@ -77,6 +83,53 @@ public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws throw new RuntimeException("Should never be used"); } + public void createEventSchema(final JobConf jobConf, final Properties tableProperties, + boolean isAcid) { + List eventColumnNames = getColNameList(tableProperties, isAcid); + List eventColumnTypeProperty = getTypeInfoList(tableProperties, isAcid); + DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(eventColumnNames, + eventColumnTypeProperty), jobConf); + } + + private List getColNameList(final Properties tableProperties, boolean isAcidSchema) { + if (isAcidSchema) { + return Arrays.asList(ParquetRecordUpdater.ACID_EVENT_ITEM_NAMES); + } else { + final String columnNameProperty = tableProperties.getProperty(IOConstants.COLUMNS); + ArrayList columnNames = new ArrayList<>(); + if (columnNameProperty.length() != 0) { + columnNames.addAll(Arrays.asList(columnNameProperty.split(","))); + } + return columnNames; + } + } + + private List getTypeInfoList(final Properties tableProperties, boolean isAcidSchema) { + final String columnTypeProperty = tableProperties.getProperty(IOConstants.COLUMNS_TYPES); + ArrayList columnTypes = new ArrayList<>(); + if (columnTypeProperty.length() != 0) { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + } + + if (isAcidSchema) { + final String columnNameProperty = tableProperties.getProperty(IOConstants.COLUMNS); + ArrayList columnNames = new ArrayList<>(); + if (columnNameProperty.length() != 0) { + columnNames.addAll(Arrays.asList(columnNameProperty.split(","))); + } + + StructTypeInfo rowType = new StructTypeInfo(); + rowType.setAllStructFieldNames(columnNames); + rowType.setAllStructFieldTypeInfos(columnTypes); + List eventColumnTypeProperty = TypeInfoUtils.getTypeInfosFromTypeString + (ParquetRecordUpdater.ACID_EVENT_TYPEINFO_STR); + eventColumnTypeProperty.add(rowType); + return eventColumnTypeProperty; + } else { + return columnTypes; + } + } + /** * * Create the parquet schema from the hive schema, and return the RecordWriterWrapper which @@ -90,27 +143,11 @@ public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws final boolean isCompressed, final Properties tableProperties, final Progressable progress) throws IOException { - LOG.info("creating new record writer..." + this); - final String columnNameProperty = tableProperties.getProperty(IOConstants.COLUMNS); - final String columnTypeProperty = tableProperties.getProperty(IOConstants.COLUMNS_TYPES); - List columnNames; - List columnTypes; - - if (columnNameProperty.length() == 0) { - columnNames = new ArrayList(); - } else { - columnNames = Arrays.asList(columnNameProperty.split(",")); - } - - if (columnTypeProperty.length() == 0) { - columnTypes = new ArrayList(); - } else { - columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); - } + boolean isAcid = Boolean.valueOf(tableProperties.getProperty(ENABLEACIDSCHEMAINFO, "false")); + createEventSchema(jobConf, tableProperties, isAcid); - DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf); return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress,tableProperties); @@ -126,4 +163,26 @@ protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper( return new ParquetRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress,tableProperties); } + + @Override + public RecordUpdater getRecordUpdater(Path path, final Options options) throws IOException { + Properties tableProperties = new Properties(); + tableProperties.putAll(options.getTableProperties()); + // If we are writing the base file, we don't need the acid info. + tableProperties.setProperty(ENABLEACIDSCHEMAINFO, Boolean.toString(!options.isWritingBase())); + + FileSinkOperator.RecordWriter realWriter = getHiveRecordWriter((JobConf) options + .getConfiguration(), path, null, options.isCompressed(), tableProperties, null); + return new ParquetRecordUpdater(realWriter, options); + } + + /** + * Used by MR compactor + */ + @Override + public FileSinkOperator.RecordWriter getRawRecordWriter(Path path, Options options) throws + IOException { + //TODO + return null; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/acid/ParquetRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/acid/ParquetRecordUpdater.java new file mode 100644 index 0000000..531775f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/acid/ParquetRecordUpdater.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.acid; + +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetStructObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class ParquetRecordUpdater implements RecordUpdater { + final static int INSERT_OPERATION = 0; + final static int UPDATE_OPERATION = 1; + final static int DELETE_OPERATION = 2; + + final static int OPERATION = 0;// int + final static int ORIGINAL_TRANSACTION = 1;// long + final static int BUCKET = 2;// int + final static int ROW_ID = 3;// long + final static int CURRENT_TRANSACTION = 4;// long + final static int ROW = 5; + final static int FIELDS = 6; // int + + public final static String[] ACID_EVENT_ITEM_NAMES = new String[]{"OPERATION", + "ORIGINAL_TRANSACTION", "BUCKET", "ROW_ID", "CURRENT_TRANSACTION", "ROW"}; + public final static String ACID_EVENT_TYPEINFO_STR = "int:bigint:int:bigint:bigint"; + + ParquetStructObjectInspector rowInspector; + FileSinkOperator.RecordWriter writer; + + private int insertedRows = 0; + + int operationVal = 1; + long originalTxnIDVal = 1; + int bucketVal = 1; + long rowIDVal = 1; + long currentTxnIDVal = 1; + + Object[] item = new Object[FIELDS]; + + public void initialRowProps(AcidOutputFormat.Options options) { + List names = new ArrayList<>(); + names.add(OPERATION, ACID_EVENT_ITEM_NAMES[OPERATION]); + names.add(ORIGINAL_TRANSACTION, ACID_EVENT_ITEM_NAMES[ORIGINAL_TRANSACTION]); + names.add(BUCKET, ACID_EVENT_ITEM_NAMES[BUCKET]); + names.add(ROW_ID, ACID_EVENT_ITEM_NAMES[ROW_ID]); + names.add(CURRENT_TRANSACTION, ACID_EVENT_ITEM_NAMES[CURRENT_TRANSACTION]); + List typeInfos = new ArrayList<>(); + typeInfos.add(OPERATION, TypeInfoFactory.intTypeInfo); + typeInfos.add(ORIGINAL_TRANSACTION, TypeInfoFactory.longTypeInfo); + typeInfos.add(BUCKET, TypeInfoFactory.intTypeInfo); + typeInfos.add(ROW_ID, TypeInfoFactory.longTypeInfo); + typeInfos.add(CURRENT_TRANSACTION, TypeInfoFactory.longTypeInfo); + + StructTypeInfo rIdStructTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(names, + typeInfos); + + ParquetStructObjectInspector eventInspector = new ParquetStructObjectInspector(rIdStructTypeInfo); + List fs = eventInspector.getFields(); + fs.add(eventInspector.buildStructField(ACID_EVENT_ITEM_NAMES[ROW], options.getInspector(), fs.size())); + rowInspector = new ParquetStructObjectInspector(fs); + bucketVal = options.getBucket(); + item[OPERATION] = operationVal; + item[ORIGINAL_TRANSACTION] = originalTxnIDVal; + item[BUCKET] = bucketVal; + item[ROW_ID] = rowIDVal; + } + + public ObjectArrayWritable getRow(ObjectArrayWritable writable) { + return (ObjectArrayWritable) rowInspector.getStructFieldData(writable, rowInspector + .getStructFieldRef(ACID_EVENT_ITEM_NAMES[ROW])); + } + + public Integer getBucket(ObjectArrayWritable writable) { + return (Integer) rowInspector.getStructFieldData(writable, rowInspector.getStructFieldRef + (ACID_EVENT_ITEM_NAMES[BUCKET])); + } + + public ParquetRecordUpdater(FileSinkOperator.RecordWriter realWriter, + AcidOutputFormat.Options options) throws IOException { + initialRowProps(options); + this.writer = realWriter; + } + + @Override + public void insert(long currentTransaction, Object row) throws IOException { + if (currentTransaction != currentTxnIDVal) { + insertedRows = 0; + } + operationVal = INSERT_OPERATION; + currentTxnIDVal = currentTransaction; + originalTxnIDVal = currentTransaction; + rowIDVal = insertedRows++; + item[ROW] = row; + ObjectArrayWritable writable = new ObjectArrayWritable(item); + ParquetHiveRecord record = new ParquetHiveRecord(writable, rowInspector); + writer.write(record); + } + + @Override + public void update(long currentTransaction, Object row) throws IOException { + // TODO + } + + @Override + public void delete(long currentTransaction, Object row) throws IOException { + // TODO + } + + @Override + public void flush() throws IOException { + close(false); + } + + @Override + public void close(boolean abort) throws IOException { + writer.close(abort); + } + + @Override + public SerDeStats getStats() { + return null; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java index f513572..ca69a68 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java @@ -15,12 +15,7 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +30,7 @@ import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -55,7 +51,6 @@ import parquet.hadoop.metadata.FileMetaData; import parquet.hadoop.metadata.ParquetMetadata; import parquet.hadoop.util.ContextUtil; -import parquet.schema.MessageTypeParser; import com.google.common.base.Strings; @@ -132,7 +127,7 @@ public ParquetRecordReaderWrapper( eof = true; } if (valueObj == null) { // Should initialize the value for createValue - valueObj = new ObjectArrayWritable(new Object[schemaSize]); + valueObj = new ObjectArrayWritable(new Writable[schemaSize]); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetStructObjectInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetStructObjectInspector.java new file mode 100644 index 0000000..65172ad --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetStructObjectInspector.java @@ -0,0 +1,290 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.serde; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.ql.io.parquet.serde.primitive.ParquetPrimitiveInspectorFactory; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; + +/** + * The ArrayWritableObjectInspector will inspect an ArrayWritable, considering it as a Hive + * struct.
+ * It can also inspect a List if Hive decides to inspect the result of an inspection. + */ +public class ParquetStructObjectInspector extends SettableStructObjectInspector { + + private final List fieldNames; + private final List fields; + private final Map fieldsByName; + + public ParquetStructObjectInspector(final StructTypeInfo rowTypeInfo) { + fieldNames = rowTypeInfo.getAllStructFieldNames(); + final List fieldInfos = rowTypeInfo.getAllStructFieldTypeInfos(); + fields = new ArrayList<>(fieldNames.size()); + fieldsByName = new HashMap<>(); + + for (int i = 0; i < fieldNames.size(); ++i) { + final String name = fieldNames.get(i); + final TypeInfo fieldInfo = fieldInfos.get(i); + + final StructFieldImpl field = new StructFieldImpl(name, getObjectInspector(fieldInfo), i); + fields.add(field); + fieldsByName.put(name, field); + } + } + + public ParquetStructObjectInspector(List fields){ + fieldNames = new ArrayList<>(); + this.fields = fields; + fieldsByName = new HashMap<>(); + for(StructField field: fields){ + fieldsByName.put(field.getFieldName(), field); + fieldNames.add(field.getFieldName()); + } + } + + public List getFields(){ + return fields; + } + + public StructField buildStructField(String fieldName, ObjectInspector inspector, int index) { + return new StructFieldImpl(fieldName, inspector, index); + } + + private ObjectInspector getObjectInspector(final TypeInfo typeInfo) { + if (typeInfo.equals(TypeInfoFactory.doubleTypeInfo)) { + return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.booleanTypeInfo)) { + return PrimitiveObjectInspectorFactory.javaBooleanObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.floatTypeInfo)) { + return PrimitiveObjectInspectorFactory.javaFloatObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.intTypeInfo)) { + return PrimitiveObjectInspectorFactory.javaIntObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.longTypeInfo)) { + return PrimitiveObjectInspectorFactory.javaLongObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.stringTypeInfo)) { + return ParquetPrimitiveInspectorFactory.parquetStringInspector; + } else if (typeInfo instanceof DecimalTypeInfo) { + return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( + (DecimalTypeInfo) typeInfo); + } else if (typeInfo.getCategory().equals(Category.STRUCT)) { + return new ParquetStructObjectInspector((StructTypeInfo) typeInfo); + } else if (typeInfo.getCategory().equals(Category.LIST)) { + final TypeInfo subTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo(); + return new ParquetHiveArrayInspector(getObjectInspector(subTypeInfo)); + } else if (typeInfo.getCategory().equals(Category.MAP)) { + final TypeInfo keyTypeInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo(); + final TypeInfo valueTypeInfo = ((MapTypeInfo) typeInfo).getMapValueTypeInfo(); + if (keyTypeInfo.equals(TypeInfoFactory.stringTypeInfo) || keyTypeInfo.equals + (TypeInfoFactory.byteTypeInfo) || keyTypeInfo.equals(TypeInfoFactory.shortTypeInfo)) { + return new DeepParquetHiveMapInspector(getObjectInspector(keyTypeInfo), + getObjectInspector(valueTypeInfo)); + } else { + return new StandardParquetHiveMapInspector(getObjectInspector(keyTypeInfo), + getObjectInspector(valueTypeInfo)); + } + } else if (typeInfo.equals(TypeInfoFactory.byteTypeInfo)) { + return ParquetPrimitiveInspectorFactory.parquetByteInspector; + } else if (typeInfo.equals(TypeInfoFactory.shortTypeInfo)) { + return ParquetPrimitiveInspectorFactory.parquetShortInspector; + } else if (typeInfo.equals(TypeInfoFactory.timestampTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableTimestampObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.binaryTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.dateTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableDateObjectInspector; + } else if (typeInfo.getTypeName().toLowerCase().startsWith(serdeConstants.CHAR_TYPE_NAME)) { + return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector((CharTypeInfo) + typeInfo); + } else if (typeInfo.getTypeName().toLowerCase().startsWith(serdeConstants.VARCHAR_TYPE_NAME)) { + return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( + (VarcharTypeInfo) typeInfo); + } else { + throw new UnsupportedOperationException("Unknown field type: " + typeInfo); + } + + } + + @Override + public Category getCategory() { + return Category.STRUCT; + } + + @Override + public String getTypeName() { + StringBuilder buffer = new StringBuilder(); + buffer.append("struct<"); + for (int i = 0; i < fields.size(); ++i) { + StructField field = fields.get(i); + if (i != 0) { + buffer.append(","); + } + buffer.append(field.getFieldName()); + buffer.append(":"); + buffer.append(field.getFieldObjectInspector().getTypeName()); + } + buffer.append(">"); + return buffer.toString(); + } + + @Override + public List getAllStructFieldRefs() { + return fields; + } + + @Override + public Object getStructFieldData(final Object data, final StructField fieldRef) { + if (data == null) { + return null; + } + + if (data instanceof ObjectArrayWritable) { + final ObjectArrayWritable arr = (ObjectArrayWritable) data; + return arr.get()[((StructFieldImpl) fieldRef).getIndex()]; + } + + //since setStructFieldData and create return a list, getStructFieldData should be able to + //handle list data. This is required when table serde is ParquetHiveSerDe and partition serde + //is something else. + if (data instanceof List) { + return ((List) data).get(((StructFieldImpl) fieldRef).getIndex()); + } + + throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); + } + + @Override + public StructField getStructFieldRef(final String name) { + return fieldsByName.get(name); + } + + @Override + public List getStructFieldsDataAsList(final Object data) { + if (data == null) { + return null; + } + + if (data instanceof ObjectArrayWritable) { + final ObjectArrayWritable arr = (ObjectArrayWritable) data; + final Object[] arrObjects = arr.get(); + return Arrays.asList(arrObjects); + } + + throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); + } + + @Override + public Object create() { + final ArrayList list = new ArrayList(fields.size()); + for (int i = 0; i < fields.size(); ++i) { + list.add(null); + } + return list; + } + + @Override + public Object setStructFieldData(Object struct, StructField field, Object fieldValue) { + final ArrayList list = (ArrayList) struct; + list.set(((StructFieldImpl) field).getIndex(), fieldValue); + return list; + } + + public boolean equals(Object o) { + if (o == null || o.getClass() != getClass()) { + return false; + } else if (o == this) { + return true; + } else { + List other = ((ParquetStructObjectInspector) o).fields; + if (other.size() != fields.size()) { + return false; + } + for (int i = 0; i < fields.size(); ++i) { + StructField left = other.get(i); + StructField right = fields.get(i); + if (!(left.getFieldName().equalsIgnoreCase(right.getFieldName()) && left + .getFieldObjectInspector().equals(right.getFieldObjectInspector()))) { + return false; + } + } + return true; + } + } + + @Override + public int hashCode() { + int result = fields.size(); + for (Object field : fields) { + if (field != null) { + result ^= field.hashCode(); + } + } + return result; + } + + class StructFieldImpl implements StructField { + + private final String name; + private final ObjectInspector inspector; + private final int index; + + public StructFieldImpl(final String name, final ObjectInspector inspector, final int index) { + this.name = name; + this.inspector = inspector; + this.index = index; + } + + @Override + public String getFieldComment() { + return ""; + } + + @Override + public String getFieldName() { + return name; + } + + public int getIndex() { + return index; + } + + @Override + public ObjectInspector getFieldObjectInspector() { + return inspector; + } + + @Override + public int getFieldID() { + return index; + } + } +} \ No newline at end of file diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/acid/TestParquetRecordUpdater.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/acid/TestParquetRecordUpdater.java new file mode 100644 index 0000000..fe531a7 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/acid/TestParquetRecordUpdater.java @@ -0,0 +1,149 @@ +package org.apache.hadoop.hive.ql.io.parquet.acid; + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; +import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.mapred.RecordReader; +import org.junit.Test; +import parquet.schema.MessageType; +import parquet.schema.MessageTypeParser; + +public class TestParquetRecordUpdater { + private static final Path tmpDir = new Path(System.getProperty("test.tmp.dir", "target" + File + .separator + "test" + File.separator + "tmp")); + + final static List values = Arrays.asList("first", "second", "third", "fourth", "fifth", + "sixth", "seventh", "eighth", "ninth", "tenth"); + + static class MyRow { + Text col1; + Text col2; + + MyRow(String val1, String val2) { + col1 = new Text(val1); + col2 = new Text(val2); + } + } + + @Test + public void testCreateEventSchema() { + MapredParquetOutputFormat outputFormat = new MapredParquetOutputFormat(); + JobConf jobConf = new JobConf(); + Properties tableProperties = new Properties(); + String columnTypeProperty = "string:string"; + String columnNameProperty = "a,b"; + tableProperties.setProperty(IOConstants.COLUMNS, columnNameProperty); + tableProperties.setProperty(IOConstants.COLUMNS_TYPES, columnTypeProperty); + outputFormat.createEventSchema(jobConf, tableProperties, true); + String eventSchema = jobConf.get(DataWritableWriteSupport.PARQUET_HIVE_SCHEMA); + MessageType realMsgType = MessageTypeParser.parseMessageType(eventSchema); + String s = "message hive_schema {\n" + + " optional int32 OPERATION;\n" + + " optional int64 ORIGINAL_TRANSACTION;\n" + + " optional int32 BUCKET;\n" + + " optional int64 ROW_ID;\n" + + " optional int64 CURRENT_TRANSACTION;\n" + + " optional group ROW {\n" + + " optional binary a (UTF8);\n" + + " optional binary b (UTF8);\n" + + " }\n" + + "}"; + MessageType expectedType = MessageTypeParser.parseMessageType(s); + Assert.assertEquals(expectedType, realMsgType); + } + + @Test + public void testInsert() throws IOException { + final int BUCKET = 10; + Configuration conf = new JobConf(); + Properties tblProps = new Properties(); + tblProps.setProperty(IOConstants.COLUMNS, "val1,val2"); + tblProps.setProperty(IOConstants.COLUMNS_TYPES, "string:string"); + FileSystem fs = FileSystem.getLocal(conf); + Map rus = new HashMap<>(); + + try { + MapredParquetOutputFormat outputFormat = new MapredParquetOutputFormat(); + + // initial inspector + ObjectInspector inspector = ObjectInspectorFactory.getReflectionObjectInspector(MyRow + .class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + + // write the base + final AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).inspector + (inspector).bucket(BUCKET).tableProperties(tblProps).writingBase(false); + + Path root = new Path(tmpDir, "testNewBaseAndDelta").makeQualified(fs); + fs.delete(root, true); + + String inputPath = ""; + int numOfSplit = 10; + for (int i = 0; i < numOfSplit; i++) { + String pathName = root.getName() + i + "_" + (i + 1); + System.out.println("The " + i + " writer with path name is " + pathName); + Path p = new Path(pathName); + if (i != (numOfSplit - 1)) { + inputPath += p.getName() + ","; + } + fs.delete(p, true); + RecordUpdater ru = outputFormat.getRecordUpdater(p, options.maximumTransactionId(100)); + for (int j = 0; j < values.size(); j++) { + ru.insert(0, new MyRow(values.get(j), values.get((j + 1) % values.size()))); + } + ru.flush(); + rus.put(p.getName(), ru); + } + + JobConf jobConf = new JobConf(conf); + jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, inputPath); + MapredParquetInputFormat inputFormat = new MapredParquetInputFormat(); + InputSplit[] splits = inputFormat.getSplits(jobConf, numOfSplit); + + for (InputSplit s : splits) { + RecordReader recordReader = inputFormat.getRecordReader(s, jobConf, null); + ObjectArrayWritable value = new ObjectArrayWritable(new Object[6]); + boolean hasMore; + int i = 0; + do { + hasMore = recordReader.next(null, value); + RecordUpdater ru = rus.get(((FileSplit) s).getPath().getName()); + ObjectArrayWritable myRow = ((ParquetRecordUpdater) ru).getRow(value); + if (myRow != null && myRow.get() != null) { + int bucketID = ((ParquetRecordUpdater) ru).getBucket(value); + Assert.assertEquals(bucketID, BUCKET); + Text col1 = (Text) myRow.get()[0]; + Assert.assertEquals(values.get((values.indexOf((col1.toString())) + 1) % values.size + ()), myRow.get()[1].toString()); + } + } while (hasMore); + } + } finally { + for (String p : rus.keySet()) { + fs.delete(new Path(p), true); + } + } + } +} diff --git a/ql/src/test/queries/clientpositive/acid_parquet_insert.q b/ql/src/test/queries/clientpositive/acid_parquet_insert.q new file mode 100644 index 0000000..92a76d5 --- /dev/null +++ b/ql/src/test/queries/clientpositive/acid_parquet_insert.q @@ -0,0 +1,10 @@ +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.enforce.bucketing=true; +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.compactor.initiator.on=true; + +CREATE TABLE numbers_bucketed (a int, b int) CLUSTERED BY (a) INTO 2 BUCKETS STORED AS Parquet TBLPROPERTIES("transactional"="true"); + +INSERT INTO TABLE numbers_bucketed VALUES(1, 100),(2, 200),(3, 300); +INSERT INTO TABLE numbers_bucketed VALUES(10, 1000); \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/acid_parquet_insert.q.out b/ql/src/test/results/clientpositive/acid_parquet_insert.q.out new file mode 100644 index 0000000..1445df8 --- /dev/null +++ b/ql/src/test/results/clientpositive/acid_parquet_insert.q.out @@ -0,0 +1,28 @@ +PREHOOK: query: CREATE TABLE numbers_bucketed (a int, b int) CLUSTERED BY (a) INTO 2 BUCKETS STORED AS Parquet TBLPROPERTIES("transactional"="true") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@numbers_bucketed +POSTHOOK: query: CREATE TABLE numbers_bucketed (a int, b int) CLUSTERED BY (a) INTO 2 BUCKETS STORED AS Parquet TBLPROPERTIES("transactional"="true") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@numbers_bucketed +PREHOOK: query: INSERT INTO TABLE numbers_bucketed VALUES(1, 100),(2, 200),(3, 300) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@numbers_bucketed +POSTHOOK: query: INSERT INTO TABLE numbers_bucketed VALUES(1, 100),(2, 200),(3, 300) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@numbers_bucketed +POSTHOOK: Lineage: numbers_bucketed.a EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: numbers_bucketed.b EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: INSERT INTO TABLE numbers_bucketed VALUES(10, 1000) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__2 +PREHOOK: Output: default@numbers_bucketed +POSTHOOK: query: INSERT INTO TABLE numbers_bucketed VALUES(10, 1000) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__2 +POSTHOOK: Output: default@numbers_bucketed +POSTHOOK: Lineage: numbers_bucketed.a EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: numbers_bucketed.b EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ]