diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java index 000eb38..926d5c2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java @@ -14,14 +14,26 @@ package org.apache.hadoop.hive.ql.io.parquet; import java.io.IOException; +import java.util.ArrayList; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.RecordReader; import parquet.hadoop.ParquetInputFormat; @@ -34,7 +46,7 @@ * NOTE: With HIVE-9235 we removed "implements VectorizedParquetInputFormat" since all data types * are not currently supported. Removing the interface turns off vectorization. */ -public class MapredParquetInputFormat extends FileInputFormat { +public class MapredParquetInputFormat extends FileInputFormat implements AcidInputFormat { private static final Log LOG = LogFactory.getLog(MapredParquetInputFormat.class); @@ -43,7 +55,7 @@ private final transient VectorizedParquetInputFormat vectorizedSelf; public MapredParquetInputFormat() { - this(new ParquetInputFormat(DataWritableReadSupport.class)); + this(new ParquetInputFormat<>(DataWritableReadSupport.class)); } protected MapredParquetInputFormat(final ParquetInputFormat inputFormat) { @@ -53,7 +65,7 @@ protected MapredParquetInputFormat(final ParquetInputFormat @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public org.apache.hadoop.mapred.RecordReader getRecordReader( + public org.apache.hadoop.mapred.RecordReader getRecordReader( final org.apache.hadoop.mapred.InputSplit split, final org.apache.hadoop.mapred.JobConf job, final org.apache.hadoop.mapred.Reporter reporter @@ -69,11 +81,32 @@ protected MapredParquetInputFormat(final ParquetInputFormat if (LOG.isDebugEnabled()) { LOG.debug("Using row-mode record reader"); } - return (RecordReader) - new ParquetRecordReaderWrapper(realInput, split, job, reporter); + return new ParquetRecordReaderWrapper(realInput, split, job, reporter); } } catch (final InterruptedException e) { throw new RuntimeException("Cannot create a RecordReaderWrapper", e); } } + + @Override + public RowReader getReader(InputSplit split, Options options) throws IOException { + //TODO + return null; + } + + @Override + public RawReader getRawReader(Configuration conf, boolean collapseEvents, + int bucket, ValidTxnList validTxnList, + Path baseDirectory, + Path[] deltaDirectory) throws IOException { + //TODO + return null; + } + + @Override + public boolean validateInput(FileSystem fs, HiveConf conf, + ArrayList files) throws IOException { + //TODO + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java index 8380117..ada140a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java @@ -23,15 +23,21 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.parquet.acid.ParquetRecordUpdater; import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport; import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper; import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; @@ -42,12 +48,10 @@ import parquet.hadoop.ParquetOutputFormat; /** - * * A Parquet OutputFormat for Hive (with the deprecated package mapred) - * */ -public class MapredParquetOutputFormat extends FileOutputFormat implements - HiveOutputFormat { +public class MapredParquetOutputFormat extends FileOutputFormat + implements AcidOutputFormat { private static final Log LOG = LogFactory.getLog(MapredParquetOutputFormat.class); @@ -57,72 +61,98 @@ public MapredParquetOutputFormat() { realOutputFormat = new ParquetOutputFormat(new DataWritableWriteSupport()); } - public MapredParquetOutputFormat(final OutputFormat mapreduceOutputFormat) { + public MapredParquetOutputFormat( + final OutputFormat mapreduceOutputFormat) { realOutputFormat = (ParquetOutputFormat) mapreduceOutputFormat; } @Override public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws IOException { - realOutputFormat.checkOutputSpecs(ShimLoader.getHadoopShims().getHCatShim().createJobContext(job, null)); + realOutputFormat.checkOutputSpecs(ShimLoader.getHadoopShims().getHCatShim().createJobContext + (job, null)); } @Override - public RecordWriter getRecordWriter( - final FileSystem ignored, - final JobConf job, - final String name, - final Progressable progress - ) throws IOException { + public RecordWriter getRecordWriter(final FileSystem ignored, + final JobConf job, final + String name, final Progressable progress) throws IOException { throw new RuntimeException("Should never be used"); } - /** - * - * Create the parquet schema from the hive schema, and return the RecordWriterWrapper which - * contains the real output format - */ - @Override - public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( - final JobConf jobConf, - final Path finalOutPath, - final Class valueClass, - final boolean isCompressed, - final Properties tableProperties, - final Progressable progress) throws IOException { + public void createEventSchema(final JobConf jobConf, final Properties tableProperties) { + List eventColumnNames = getColNameList(); + List eventColumnTypeProperty = getTypeInfoList(tableProperties); + DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(eventColumnNames, + eventColumnTypeProperty), jobConf); + } - LOG.info("creating new record writer..." + this); + private List getColNameList() { + String eventColumnNameProperty = "operation,orignalTxnID,BUCKET,rowID,currentTxnID,ROW"; + return Arrays.asList(eventColumnNameProperty.split(",")); + } + private List getTypeInfoList(final Properties tableProperties) { final String columnNameProperty = tableProperties.getProperty(IOConstants.COLUMNS); final String columnTypeProperty = tableProperties.getProperty(IOConstants.COLUMNS_TYPES); - List columnNames; - List columnTypes; + ArrayList columnNames = new ArrayList<>(); + ArrayList columnTypes = new ArrayList<>(); - if (columnNameProperty.length() == 0) { - columnNames = new ArrayList(); - } else { - columnNames = Arrays.asList(columnNameProperty.split(",")); + if (columnNameProperty.length() != 0) { + columnNames.addAll(Arrays.asList(columnNameProperty.split(","))); } - if (columnTypeProperty.length() == 0) { - columnTypes = new ArrayList(); - } else { + if (columnTypeProperty.length() != 0) { columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); } + StructTypeInfo rowType = new StructTypeInfo(); + rowType.setAllStructFieldNames(columnNames); + rowType.setAllStructFieldTypeInfos(columnTypes); + List eventColumnTypeProperty = TypeInfoUtils.getTypeInfosFromTypeString + ("int:bigint:int:bigint:bigint"); + eventColumnTypeProperty.add(rowType); + return eventColumnTypeProperty; + } - DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf); + /** + * Create the parquet schema from the hive schema, and return the RecordWriterWrapper which + * contains the real output format + */ + @Override + public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( + final JobConf jobConf, final Path finalOutPath, final Class valueClass, + final boolean isCompressed, final Properties tableProperties, + final Progressable progress) throws IOException { + LOG.info("creating new record writer..." + this); return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), - progress,tableProperties); + progress, tableProperties); } protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper( - ParquetOutputFormat realOutputFormat, - JobConf jobConf, - String finalOutPath, - Progressable progress, - Properties tableProperties - ) throws IOException { + ParquetOutputFormat realOutputFormat, JobConf jobConf, String finalOutPath, + Progressable progress, Properties tableProperties) throws IOException { return new ParquetRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), - progress,tableProperties); + progress, tableProperties); + } + + @Override + public RecordUpdater getRecordUpdater(Path path, final Options options) throws IOException { + Properties tableProperties = new Properties(); + tableProperties.putAll(options.getTableProperties()); + + createEventSchema((JobConf) options.getConfiguration(),tableProperties); + FileSinkOperator.RecordWriter realWriter = getHiveRecordWriter((JobConf) options + .getConfiguration(), path, null, options.isCompressed(), tableProperties, null); + return new ParquetRecordUpdater(realWriter, options); + } + + /** + * Used by mr compactor + */ + @Override + public FileSinkOperator.RecordWriter getRawRecordWriter(Path path, Options options) throws + IOException { + //TODO + return null; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java index 4e1820c..c557963 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java @@ -52,7 +52,7 @@ private final ParquetRecordReaderWrapper internalReader; private VectorizedRowBatchCtx rbCtx; private ObjectArrayWritable internalValues; - private Void internalKey; + private NullWritable internalKey; private VectorColumnAssign[] assigners; public VectorizedParquetRecordReader( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/acid/ParquetRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/acid/ParquetRawRecordMerger.java new file mode 100644 index 0000000..eba8358 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/acid/ParquetRawRecordMerger.java @@ -0,0 +1,94 @@ +package org.apache.hadoop.hive.ql.io.parquet.acid; + +import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.ArrayWritable; +import parquet.hadoop.ParquetFileReader; +import parquet.hadoop.ParquetRecordReader; + +import java.io.IOException; +import java.util.Map; + +public class ParquetRawRecordMerger implements AcidInputFormat.RawReader{ + private Map readers; + + @Override + public ObjectInspector getObjectInspector() { + return null; + } + + @Override + public boolean isDelete(ArrayWritable value) { + return false; + } + + @Override + public boolean next(RecordIdentifier key, ArrayWritable value) throws IOException { + return false; + } + + @Override + public RecordIdentifier createKey() { + return null; + } + + @Override + public ArrayWritable createValue() { + return null; + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + + } + + @Override + public float getProgress() throws IOException { + return 0; + } + + static class ReaderPair{ + ReaderKey maxKey; + ParquetHiveRecord record; + ParquetRecordReader recordReader; + ParquetFileReader reader; + + public ReaderPair(ReaderKey minKey){ + //advance the record bigger than the minimal key + + } + + public void next() throws IOException, InterruptedException { + if (recordReader.nextKeyValue()) { + record = recordReader.getCurrentValue(); + } + } + } + + /** + * A RecordIdentifier extended with the current transaction id. This is the + * key of our merge sort with the originalTransaction, bucket, and rowId + * ascending and the currentTransaction descending. This means that if the + * reader is collapsing events to just the last update, just the first + * instance of each record is required. + */ + final static class ReaderKey extends RecordIdentifier { + private long currentTransactionId; + + public ReaderKey() { + this(-1, -1, -1, -1); + } + + public ReaderKey(long originalTransaction, int bucket, long rowId, long currentTransactionId) { + super(originalTransaction, bucket, rowId); + this.currentTransactionId = currentTransactionId; + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/acid/ParquetRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/acid/ParquetRecordUpdater.java new file mode 100644 index 0000000..f2db6f2 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/acid/ParquetRecordUpdater.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.acid; + +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetStructObjectInspector; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class ParquetRecordUpdater implements RecordUpdater { + final static int INSERT_OPERATION = 0; + final static int UPDATE_OPERATION = 1; + final static int DELETE_OPERATION = 2; + + final static int OPERATION = 0;// int + final static int ORIGINAL_TRANSACTION = 1;// long + final static int BUCKET = 2;// int + final static int ROW_ID = 3;// long + final static int CURRENT_TRANSACTION = 4;// long + final static int ROW = 5; + final static int FIELDS = 6; // int + + ParquetStructObjectInspector rowInspector; + FileSinkOperator.RecordWriter writer; + + private int insertedRows = 0; + + int operationVal = 1; + long originalTxnIDVal = 1; + int bucketVal = 1; + long rowIDVal = 1; + long currentTxnIDVal = 1; + + Object[] item = new Object[FIELDS]; + + public void initialRowProps(AcidOutputFormat.Options options) { + List names = new ArrayList<>(); + names.add(OPERATION, "OPERATION"); + names.add(ORIGINAL_TRANSACTION, "ORIGINAL_TRANSACTION"); + names.add(BUCKET, "BUCKET"); + names.add(ROW_ID, "ROW_ID"); + names.add(CURRENT_TRANSACTION, "CURRENT_TRANSACTION"); + List typeInfos = new ArrayList<>(); + typeInfos.add(OPERATION, TypeInfoFactory.intTypeInfo); + typeInfos.add(ORIGINAL_TRANSACTION, TypeInfoFactory.longTypeInfo); + typeInfos.add(BUCKET, TypeInfoFactory.intTypeInfo); + typeInfos.add(ROW_ID, TypeInfoFactory.longTypeInfo); + typeInfos.add(CURRENT_TRANSACTION, TypeInfoFactory.longTypeInfo); + + StructTypeInfo rIdStructTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(names, + typeInfos); + + ParquetStructObjectInspector eventInspector = new ParquetStructObjectInspector(rIdStructTypeInfo); + List fs = eventInspector.getFields(); + fs.add(eventInspector.buildStructField("ROW", options.getInspector(), fs.size())); + rowInspector = new ParquetStructObjectInspector(fs); + +// rowInspector = new ParquetStructObjectInspector(rIdStructTypeInfo); +// rowInspector.appendField("ROW", options.getInspector()); + + bucketVal = options.getBucket(); + item[OPERATION] = operationVal; + item[ORIGINAL_TRANSACTION] = originalTxnIDVal; + item[BUCKET] = bucketVal; + item[ROW_ID] = rowIDVal; + } + + public ObjectArrayWritable getRow(ObjectArrayWritable writable) { + return (ObjectArrayWritable) rowInspector.getStructFieldData(writable, rowInspector + .getStructFieldRef("ROW")); + } + + public Integer getBucket(ObjectArrayWritable writable) { + return (Integer) rowInspector.getStructFieldData(writable, rowInspector.getStructFieldRef + ("BUCKET")); + } + + public ParquetRecordUpdater(FileSinkOperator.RecordWriter realWriter, + AcidOutputFormat.Options options) throws IOException { + initialRowProps(options); + this.writer = realWriter; + } + + @Override + public void insert(long currentTransaction, Object row) throws IOException { + if (currentTransaction != currentTxnIDVal) { + insertedRows = 0; + } + operationVal = INSERT_OPERATION; + currentTxnIDVal = currentTransaction; + originalTxnIDVal = currentTransaction; + rowIDVal = insertedRows++; + item[ROW] = row; + ObjectArrayWritable writable = new ObjectArrayWritable(item); + ParquetHiveRecord record = new ParquetHiveRecord(writable, rowInspector); + writer.write(record); + } + + @Override + public void update(long currentTransaction, Object row) throws IOException { + // TODO + } + + @Override + public void delete(long currentTransaction, Object row) throws IOException { + // TODO + } + + @Override + public void flush() throws IOException { + close(false); + } + + @Override + public void close(boolean abort) throws IOException { + writer.close(abort); + } + + @Override + public SerDeStats getStats() { + return null; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java index 43c772f..8daccd1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java @@ -42,7 +42,7 @@ public static MessageType convert(final List columnNames, final List columnNames, final List columnTypes) { + public static Type[] convertTypes(final List columnNames, final List columnTypes) { if (columnNames.size() != columnTypes.size()) { throw new IllegalStateException("Mismatched Hive columns and types. Hive columns names" + " found : " + columnNames + " . And Hive types found : " + columnTypes); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java index 0a5edbb..34213b3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java @@ -34,6 +34,9 @@ import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -58,7 +61,7 @@ import com.google.common.base.Strings; -public class ParquetRecordReaderWrapper implements RecordReader { +public class ParquetRecordReaderWrapper implements RecordReader { public static final Log LOG = LogFactory.getLog(ParquetRecordReaderWrapper.class); private final long splitLen; // for getPos() @@ -131,7 +134,7 @@ public ParquetRecordReaderWrapper( eof = true; } if (valueObj == null) { // Should initialize the value for createValue - valueObj = new ObjectArrayWritable(new Object[schemaSize]); + valueObj = new ObjectArrayWritable(new Writable[schemaSize]); } } @@ -166,7 +169,7 @@ public void close() throws IOException { } @Override - public Void createKey() { + public NullWritable createKey() { return null; } @@ -194,7 +197,7 @@ public float getProgress() throws IOException { } @Override - public boolean next(final Void key, final ObjectArrayWritable value) throws IOException { + public boolean next(final NullWritable key, final ObjectArrayWritable value) throws IOException { if (eof) { return false; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetStructObjectInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetStructObjectInspector.java new file mode 100644 index 0000000..65172ad --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetStructObjectInspector.java @@ -0,0 +1,290 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.serde; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.ql.io.parquet.serde.primitive.ParquetPrimitiveInspectorFactory; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; + +/** + * The ArrayWritableObjectInspector will inspect an ArrayWritable, considering it as a Hive + * struct.
+ * It can also inspect a List if Hive decides to inspect the result of an inspection. + */ +public class ParquetStructObjectInspector extends SettableStructObjectInspector { + + private final List fieldNames; + private final List fields; + private final Map fieldsByName; + + public ParquetStructObjectInspector(final StructTypeInfo rowTypeInfo) { + fieldNames = rowTypeInfo.getAllStructFieldNames(); + final List fieldInfos = rowTypeInfo.getAllStructFieldTypeInfos(); + fields = new ArrayList<>(fieldNames.size()); + fieldsByName = new HashMap<>(); + + for (int i = 0; i < fieldNames.size(); ++i) { + final String name = fieldNames.get(i); + final TypeInfo fieldInfo = fieldInfos.get(i); + + final StructFieldImpl field = new StructFieldImpl(name, getObjectInspector(fieldInfo), i); + fields.add(field); + fieldsByName.put(name, field); + } + } + + public ParquetStructObjectInspector(List fields){ + fieldNames = new ArrayList<>(); + this.fields = fields; + fieldsByName = new HashMap<>(); + for(StructField field: fields){ + fieldsByName.put(field.getFieldName(), field); + fieldNames.add(field.getFieldName()); + } + } + + public List getFields(){ + return fields; + } + + public StructField buildStructField(String fieldName, ObjectInspector inspector, int index) { + return new StructFieldImpl(fieldName, inspector, index); + } + + private ObjectInspector getObjectInspector(final TypeInfo typeInfo) { + if (typeInfo.equals(TypeInfoFactory.doubleTypeInfo)) { + return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.booleanTypeInfo)) { + return PrimitiveObjectInspectorFactory.javaBooleanObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.floatTypeInfo)) { + return PrimitiveObjectInspectorFactory.javaFloatObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.intTypeInfo)) { + return PrimitiveObjectInspectorFactory.javaIntObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.longTypeInfo)) { + return PrimitiveObjectInspectorFactory.javaLongObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.stringTypeInfo)) { + return ParquetPrimitiveInspectorFactory.parquetStringInspector; + } else if (typeInfo instanceof DecimalTypeInfo) { + return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( + (DecimalTypeInfo) typeInfo); + } else if (typeInfo.getCategory().equals(Category.STRUCT)) { + return new ParquetStructObjectInspector((StructTypeInfo) typeInfo); + } else if (typeInfo.getCategory().equals(Category.LIST)) { + final TypeInfo subTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo(); + return new ParquetHiveArrayInspector(getObjectInspector(subTypeInfo)); + } else if (typeInfo.getCategory().equals(Category.MAP)) { + final TypeInfo keyTypeInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo(); + final TypeInfo valueTypeInfo = ((MapTypeInfo) typeInfo).getMapValueTypeInfo(); + if (keyTypeInfo.equals(TypeInfoFactory.stringTypeInfo) || keyTypeInfo.equals + (TypeInfoFactory.byteTypeInfo) || keyTypeInfo.equals(TypeInfoFactory.shortTypeInfo)) { + return new DeepParquetHiveMapInspector(getObjectInspector(keyTypeInfo), + getObjectInspector(valueTypeInfo)); + } else { + return new StandardParquetHiveMapInspector(getObjectInspector(keyTypeInfo), + getObjectInspector(valueTypeInfo)); + } + } else if (typeInfo.equals(TypeInfoFactory.byteTypeInfo)) { + return ParquetPrimitiveInspectorFactory.parquetByteInspector; + } else if (typeInfo.equals(TypeInfoFactory.shortTypeInfo)) { + return ParquetPrimitiveInspectorFactory.parquetShortInspector; + } else if (typeInfo.equals(TypeInfoFactory.timestampTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableTimestampObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.binaryTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.dateTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableDateObjectInspector; + } else if (typeInfo.getTypeName().toLowerCase().startsWith(serdeConstants.CHAR_TYPE_NAME)) { + return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector((CharTypeInfo) + typeInfo); + } else if (typeInfo.getTypeName().toLowerCase().startsWith(serdeConstants.VARCHAR_TYPE_NAME)) { + return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( + (VarcharTypeInfo) typeInfo); + } else { + throw new UnsupportedOperationException("Unknown field type: " + typeInfo); + } + + } + + @Override + public Category getCategory() { + return Category.STRUCT; + } + + @Override + public String getTypeName() { + StringBuilder buffer = new StringBuilder(); + buffer.append("struct<"); + for (int i = 0; i < fields.size(); ++i) { + StructField field = fields.get(i); + if (i != 0) { + buffer.append(","); + } + buffer.append(field.getFieldName()); + buffer.append(":"); + buffer.append(field.getFieldObjectInspector().getTypeName()); + } + buffer.append(">"); + return buffer.toString(); + } + + @Override + public List getAllStructFieldRefs() { + return fields; + } + + @Override + public Object getStructFieldData(final Object data, final StructField fieldRef) { + if (data == null) { + return null; + } + + if (data instanceof ObjectArrayWritable) { + final ObjectArrayWritable arr = (ObjectArrayWritable) data; + return arr.get()[((StructFieldImpl) fieldRef).getIndex()]; + } + + //since setStructFieldData and create return a list, getStructFieldData should be able to + //handle list data. This is required when table serde is ParquetHiveSerDe and partition serde + //is something else. + if (data instanceof List) { + return ((List) data).get(((StructFieldImpl) fieldRef).getIndex()); + } + + throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); + } + + @Override + public StructField getStructFieldRef(final String name) { + return fieldsByName.get(name); + } + + @Override + public List getStructFieldsDataAsList(final Object data) { + if (data == null) { + return null; + } + + if (data instanceof ObjectArrayWritable) { + final ObjectArrayWritable arr = (ObjectArrayWritable) data; + final Object[] arrObjects = arr.get(); + return Arrays.asList(arrObjects); + } + + throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); + } + + @Override + public Object create() { + final ArrayList list = new ArrayList(fields.size()); + for (int i = 0; i < fields.size(); ++i) { + list.add(null); + } + return list; + } + + @Override + public Object setStructFieldData(Object struct, StructField field, Object fieldValue) { + final ArrayList list = (ArrayList) struct; + list.set(((StructFieldImpl) field).getIndex(), fieldValue); + return list; + } + + public boolean equals(Object o) { + if (o == null || o.getClass() != getClass()) { + return false; + } else if (o == this) { + return true; + } else { + List other = ((ParquetStructObjectInspector) o).fields; + if (other.size() != fields.size()) { + return false; + } + for (int i = 0; i < fields.size(); ++i) { + StructField left = other.get(i); + StructField right = fields.get(i); + if (!(left.getFieldName().equalsIgnoreCase(right.getFieldName()) && left + .getFieldObjectInspector().equals(right.getFieldObjectInspector()))) { + return false; + } + } + return true; + } + } + + @Override + public int hashCode() { + int result = fields.size(); + for (Object field : fields) { + if (field != null) { + result ^= field.hashCode(); + } + } + return result; + } + + class StructFieldImpl implements StructField { + + private final String name; + private final ObjectInspector inspector; + private final int index; + + public StructFieldImpl(final String name, final ObjectInspector inspector, final int index) { + this.name = name; + this.inspector = inspector; + this.index = index; + } + + @Override + public String getFieldComment() { + return ""; + } + + @Override + public String getFieldName() { + return name; + } + + public int getIndex() { + return index; + } + + @Override + public ObjectInspector getFieldObjectInspector() { + return inspector; + } + + @Override + public int getFieldID() { + return index; + } + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java index 0d32e49..b7987a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java @@ -20,6 +20,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobContext; @@ -35,12 +36,12 @@ import parquet.hadoop.metadata.CompressionCodecName; import parquet.hadoop.util.ContextUtil; -public class ParquetRecordWriterWrapper implements RecordWriter, +public class ParquetRecordWriterWrapper implements RecordWriter, org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter { public static final Log LOG = LogFactory.getLog(ParquetRecordWriterWrapper.class); - private final org.apache.hadoop.mapreduce.RecordWriter realWriter; + private final org.apache.hadoop.mapreduce.RecordWriter realWriter; private final TaskAttemptContext taskContext; public ParquetRecordWriterWrapper( @@ -106,7 +107,7 @@ public void close(final Reporter reporter) throws IOException { } @Override - public void write(final Void key, final ParquetHiveRecord value) throws IOException { + public void write(final NullWritable key, final ParquetHiveRecord value) throws IOException { try { realWriter.write(key, value); } catch (final InterruptedException e) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java index 5f7f597..698dca0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java @@ -15,6 +15,7 @@ import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; @@ -122,12 +123,12 @@ public static void assertEquals(String message, ObjectArrayWritable expected, public static List read(Path parquetFile) throws IOException { List records = new ArrayList(); - RecordReader reader = new MapredParquetInputFormat(). + RecordReader reader = new MapredParquetInputFormat(). getRecordReader(new FileSplit( parquetFile, 0, fileLength(parquetFile), (String[]) null), new JobConf(), null); - Void alwaysNull = reader.createKey(); + NullWritable alwaysNull = reader.createKey(); ObjectArrayWritable record = reader.createValue(); while (reader.next(alwaysNull, record)) { records.add(record); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/acid/TestParquetRecordUpdater.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/acid/TestParquetRecordUpdater.java new file mode 100644 index 0000000..81ab33c --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/acid/TestParquetRecordUpdater.java @@ -0,0 +1,193 @@ +package org.apache.hadoop.hive.ql.io.parquet.acid; + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.parquet.AbstractTestParquetDirect; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; +import org.apache.hadoop.hive.ql.io.parquet.convert.DataWritableRecordConverter; +import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport; +import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.mapred.RecordReader; +import org.junit.Test; +import parquet.bytes.BytesInput; +import parquet.bytes.LittleEndianDataInputStream; +import parquet.column.page.DataPageV1; +import parquet.column.page.DataPageV2; +import parquet.column.page.PageReadStore; +import parquet.column.page.PageReader; +import parquet.format.ColumnChunk; +import parquet.hadoop.ParquetFileReader; +import parquet.hadoop.ParquetInputSplit; +import parquet.hadoop.ParquetOutputFormat; +import parquet.hadoop.ParquetRecordReader; +import parquet.hadoop.ParquetWriter; +import parquet.hadoop.api.InitContext; +import parquet.hadoop.api.ReadSupport; +import parquet.hadoop.api.WriteSupport; +import parquet.hadoop.metadata.BlockMetaData; +import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.hadoop.metadata.ParquetMetadata; +import parquet.io.MessageColumnIO; +import parquet.io.api.RecordConsumer; +import parquet.schema.MessageType; +import parquet.schema.MessageTypeParser; + +import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER; + +public class TestParquetRecordUpdater { + private static final Path tmpDir = new Path(System.getProperty("test.tmp.dir", "target" + File + .separator + "test" + File.separator + "tmp")); + + final static List values = Arrays.asList("first", "second", "third", "fourth", "fifth", + "sixth", "seventh", "eighth", "ninth", "tenth"); + + static class MyRow { + Text col1; + Text col2; + + MyRow(String val1, String val2) { + col1 = new Text(val1); + col2 = new Text(val2); + } + } + + @Test + public void testCreateEventSchema() { + MapredParquetOutputFormat outputFormat = new MapredParquetOutputFormat(); + JobConf jobConf = new JobConf(); + Properties tableProperties = new Properties(); + String columnTypeProperty = "string:string"; + String columnNameProperty = "a,b"; + tableProperties.setProperty(IOConstants.COLUMNS, columnNameProperty); + tableProperties.setProperty(IOConstants.COLUMNS_TYPES, columnTypeProperty); + outputFormat.createEventSchema(jobConf, tableProperties); + String eventSchema = jobConf.get(DataWritableWriteSupport.PARQUET_HIVE_SCHEMA); + MessageType realMsgType = MessageTypeParser.parseMessageType(eventSchema); + String s = "message hive_schema {\n" + + " optional int32 operation;\n" + + " optional int64 orignalTxnID;\n" + + " optional int32 BUCKET;\n" + + " optional int64 rowID;\n" + + " optional int64 currentTxnID;\n" + + " optional group ROW {\n" + + " optional binary a (UTF8);\n" + + " optional binary b (UTF8);\n" + + " }\n" + + "}"; + MessageType expectedType = MessageTypeParser.parseMessageType(s); + Assert.assertEquals(expectedType, realMsgType); + } + + @Test + public void testInsert() throws IOException { + final int BUCKET = 10; + Configuration conf = new JobConf(); + Properties tblProps = new Properties(); + tblProps.setProperty(IOConstants.COLUMNS, "val1,val2"); + tblProps.setProperty(IOConstants.COLUMNS_TYPES, "string:string"); + tblProps.setProperty(ParquetOutputFormat.ENABLE_DICTIONARY, "false"); + FileSystem fs = FileSystem.getLocal(conf); + + MapredParquetOutputFormat outputFormat = new MapredParquetOutputFormat(); + + // initial inspector + ObjectInspector inspector = ObjectInspectorFactory.getReflectionObjectInspector(MyRow.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + + // write the base + final AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).inspector + (inspector).bucket(BUCKET).tableProperties(tblProps); + + Path root = new Path(tmpDir, "testNewBaseAndDelta").makeQualified(fs); + fs.delete(root, true); + + String inputPath = ""; + int numOfSplit = 10; + Map rus = new HashMap<>(); + for (int i = 0; i < numOfSplit; i++) { + String pathName = root.getName() + i + "_" + (i + 1); + System.out.println("The " + i + " writer with path name is " + pathName); + Path p = new Path(pathName); + if (i != (numOfSplit - 1)) { + inputPath += p.getName() + ","; + } + fs.delete(p, true); + RecordUpdater ru = outputFormat.getRecordUpdater(p, options.maximumTransactionId(100)); + for (int j = 0; j < values.size(); j++) { + ru.insert(0, new MyRow(values.get(j), values.get((j + 1) % values.size()))); + } + ru.flush(); + rus.put(p.getName(), ru); + } + + JobConf jobConf = new JobConf(conf); + jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, inputPath); + MapredParquetInputFormat inputFormat = new MapredParquetInputFormat(); + InputSplit[] splits = inputFormat.getSplits(jobConf, numOfSplit); + + for (InputSplit s : splits) { + RecordReader recordReader = inputFormat.getRecordReader(s, jobConf, null); + ObjectArrayWritable value = new ObjectArrayWritable(new Object[6]); + boolean hasMore; + int i = 0; + do { + hasMore = recordReader.next(null, value); + RecordUpdater ru = rus.get(((FileSplit) s).getPath().getName()); + ObjectArrayWritable myRow = ((ParquetRecordUpdater) ru).getRow(value); + if (myRow != null && myRow.get() != null) { + int bucketID = ((ParquetRecordUpdater) ru).getBucket(value); + Assert.assertEquals(bucketID, BUCKET); + Text col1 = (Text) myRow.get()[0]; + Assert.assertEquals(values.get((values.indexOf((col1.toString())) + 1) % values.size()) + , myRow.get()[1].toString()); + } + } while (hasMore); + } + + for (String p : rus.keySet()) { + fs.delete(new Path(p), true); + } + } +} diff --git a/ql/src/test/queries/clientpositive/acid_parquet_insert.q b/ql/src/test/queries/clientpositive/acid_parquet_insert.q new file mode 100644 index 0000000..92a76d5 --- /dev/null +++ b/ql/src/test/queries/clientpositive/acid_parquet_insert.q @@ -0,0 +1,10 @@ +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.enforce.bucketing=true; +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.compactor.initiator.on=true; + +CREATE TABLE numbers_bucketed (a int, b int) CLUSTERED BY (a) INTO 2 BUCKETS STORED AS Parquet TBLPROPERTIES("transactional"="true"); + +INSERT INTO TABLE numbers_bucketed VALUES(1, 100),(2, 200),(3, 300); +INSERT INTO TABLE numbers_bucketed VALUES(10, 1000); \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/acid_parquet_insert.q.out b/ql/src/test/results/clientpositive/acid_parquet_insert.q.out new file mode 100644 index 0000000..1445df8 --- /dev/null +++ b/ql/src/test/results/clientpositive/acid_parquet_insert.q.out @@ -0,0 +1,28 @@ +PREHOOK: query: CREATE TABLE numbers_bucketed (a int, b int) CLUSTERED BY (a) INTO 2 BUCKETS STORED AS Parquet TBLPROPERTIES("transactional"="true") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@numbers_bucketed +POSTHOOK: query: CREATE TABLE numbers_bucketed (a int, b int) CLUSTERED BY (a) INTO 2 BUCKETS STORED AS Parquet TBLPROPERTIES("transactional"="true") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@numbers_bucketed +PREHOOK: query: INSERT INTO TABLE numbers_bucketed VALUES(1, 100),(2, 200),(3, 300) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@numbers_bucketed +POSTHOOK: query: INSERT INTO TABLE numbers_bucketed VALUES(1, 100),(2, 200),(3, 300) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@numbers_bucketed +POSTHOOK: Lineage: numbers_bucketed.a EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: numbers_bucketed.b EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: INSERT INTO TABLE numbers_bucketed VALUES(10, 1000) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__2 +PREHOOK: Output: default@numbers_bucketed +POSTHOOK: query: INSERT INTO TABLE numbers_bucketed VALUES(10, 1000) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__2 +POSTHOOK: Output: default@numbers_bucketed +POSTHOOK: Lineage: numbers_bucketed.a EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: numbers_bucketed.b EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ]