diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java index b2abc5fbb3..d547a2b735 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -117,10 +118,10 @@ public void write(WritableComparable key, HCatRecord value) throws IOExceptio value.remove(colToDel); } - // The key given by user is ignored try { - localWriter.write(NullWritable.get(), - localSerDe.serialize(value.getAll(), localObjectInspector)); + // The key given by user is ignored - in case of Parquet we need to supply null + Object keyToWrite = localWriter instanceof ParquetRecordWriterWrapper ? null : NullWritable.get(); + localWriter.write(keyToWrite, localSerDe.serialize(value.getAll(), localObjectInspector)); } catch (SerDeException e) { throw new IOException("Failed to serialize object", e); } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java index 60af5c0bf3..bd4be66681 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java @@ -24,6 +24,10 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; +import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils; +import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport; import org.apache.orc.OrcConf; import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; import org.apache.hadoop.hive.serde2.avro.AvroSerDe; @@ -37,6 +41,9 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.TimeZone; + +import com.google.common.collect.Maps; /** * This class is a place to put all the code associated with @@ -120,6 +127,27 @@ public static void addSpecialCasesParametersToOutputJobProperties( } + } else if (ofclass == MapredParquetOutputFormat.class) { + //Handle table properties + Properties tblProperties = new Properties(); + Map tableProps = jobInfo.getTableInfo().getTable().getParameters(); + for (String key : tableProps.keySet()) { + if (ParquetTableUtils.isParquetTblProperty(key)) { + tblProperties.put(key, tableProps.get(key)); + } + } + + //Handle table schema + List colNames = jobInfo.getOutputSchema().getFieldNames(); + List colTypes = new ArrayList(); + for (HCatFieldSchema field : jobInfo.getOutputSchema().getFields()){ + colTypes.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getTypeString())); + } + String parquetSchema = HiveSchemaConverter.convert(colNames, colTypes).toString(); + jobProperties.put(DataWritableWriteSupport.PARQUET_HIVE_SCHEMA, parquetSchema); + + jobProperties.putAll(Maps.fromProperties(tblProperties)); + } } diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderComplexSchema.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderComplexSchema.java index 4c686fec59..ea9cdda31c 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderComplexSchema.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderComplexSchema.java @@ -77,9 +77,6 @@ add("testMapNullKey"); }}); put(IOConstants.PARQUETFILE, new HashSet() {{ - add("testSyntheticComplexSchema"); - add("testTupleInBagInTupleInBag"); - add("testMapWithComplexData"); add("testMapNullKey"); }}); }}; diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderEncryption.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderEncryption.java index ad11eab1b7..903578b279 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderEncryption.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderEncryption.java @@ -101,14 +101,7 @@ private static List readRecords = new ArrayList(); private static final Map> DISABLED_STORAGE_FORMATS = - new HashMap>() {{ - put(IOConstants.PARQUETFILE, new HashSet() {{ - add("testReadDataBasic"); - add("testReadPartitionedBasic"); - add("testProjectionsBasic"); - add("testReadDataFromEncryptedHiveTable"); - }}); - }}; + new HashMap>(); private String storageFormat; diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerMulti.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerMulti.java index 918332ddfd..977343711e 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerMulti.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerMulti.java @@ -67,13 +67,7 @@ private static Map> basicInputData; private static final Map> DISABLED_STORAGE_FORMATS = - new HashMap>() {{ - put(IOConstants.PARQUETFILE, new HashSet() {{ - add("testStoreBasicTable"); - add("testStorePartitionedTable"); - add("testStoreTableMulti"); - }}); - }}; + new HashMap>(); private final String storageFormat; diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatLoader.java index 6cd382145b..082a6c2a57 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatLoader.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatLoader.java @@ -33,53 +33,4 @@ String getStorageFormat() { return IOConstants.PARQUET; } - - @Override - @Test - @Ignore("Temporarily disable until fixed") - public void testReadDataBasic() throws IOException { - super.testReadDataBasic(); - } - - @Override - @Test - @Ignore("Temporarily disable until fixed") - public void testReadPartitionedBasic() throws IOException, CommandNeedRetryException { - super.testReadPartitionedBasic(); - } - - @Override - @Test - @Ignore("Temporarily disable until fixed") - public void testProjectionsBasic() throws IOException { - super.testProjectionsBasic(); - } - - /** - * Tests the failure case caused by HIVE-10752 - * @throws Exception - */ - @Override - @Test - @Ignore("Temporarily disable until fixed") - public void testColumnarStorePushdown2() throws Exception { - super.testColumnarStorePushdown2(); - } - - @Override - @Test - @Ignore("Temporarily disable until fixed") - public void testReadMissingPartitionBasicNeg() throws IOException, CommandNeedRetryException { - super.testReadMissingPartitionBasicNeg(); - } - - /** - * Test if we can read a date partitioned table - */ - @Override - @Test - @Ignore("Temporarily disable until fixed") - public void testDatePartitionPushUp() throws Exception { - super.testDatePartitionPushUp(); - } } diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatStorer.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatStorer.java index 6dfdc04954..f02a031160 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatStorer.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatStorer.java @@ -34,167 +34,4 @@ String getStorageFormat() { return IOConstants.PARQUETFILE; } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testBagNStruct() throws IOException, CommandNeedRetryException { - super.testBagNStruct(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testDateCharTypes() throws Exception { - super.testDateCharTypes(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testDynamicPartitioningMultiPartColsInDataNoSpec() throws IOException, - CommandNeedRetryException { - super.testDynamicPartitioningMultiPartColsInDataNoSpec(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testDynamicPartitioningMultiPartColsInDataPartialSpec() throws IOException, - CommandNeedRetryException { - super.testDynamicPartitioningMultiPartColsInDataPartialSpec(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testMultiPartColsInData() throws Exception { - super.testMultiPartColsInData(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testPartColsInData() throws IOException, CommandNeedRetryException { - super.testPartColsInData(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testStoreFuncAllSimpleTypes() throws IOException, CommandNeedRetryException { - super.testStoreFuncAllSimpleTypes(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testStoreFuncSimple() throws IOException, CommandNeedRetryException { - super.testStoreFuncSimple(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testStoreInPartiitonedTbl() throws Exception { - super.testStoreInPartiitonedTbl(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testStoreMultiTables() throws IOException, CommandNeedRetryException { - super.testStoreMultiTables(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testStoreWithNoCtorArgs() throws IOException, CommandNeedRetryException { - super.testStoreWithNoCtorArgs(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testStoreWithNoSchema() throws IOException, CommandNeedRetryException { - super.testStoreWithNoSchema(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteChar() throws Exception { - super.testWriteChar(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteDate() throws Exception { - super.testWriteDate(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteDate2() throws Exception { - super.testWriteDate2(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteDate3() throws Exception { - super.testWriteDate3(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteDecimal() throws Exception { - super.testWriteDecimal(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteDecimalX() throws Exception { - super.testWriteDecimalX(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteDecimalXY() throws Exception { - super.testWriteDecimalXY(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteSmallint() throws Exception { - super.testWriteSmallint(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteTimestamp() throws Exception { - super.testWriteTimestamp(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteTinyint() throws Exception { - super.testWriteTinyint(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteVarchar() throws Exception { - super.testWriteVarchar(); - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java index 379a9135d9..e3ab994fb0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java @@ -75,7 +75,7 @@ public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws final String name, final Progressable progress ) throws IOException { - throw new RuntimeException("Should never be used"); + return new ParquetRecordWriterWrapper(realOutputFormat, job, name, progress); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetTableUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetTableUtils.java new file mode 100644 index 0000000000..9a16b341a6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetTableUtils.java @@ -0,0 +1,22 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.serde; + +public class ParquetTableUtils { + + public static boolean isParquetTblProperty(String key) { + return key.startsWith("parquet."); + } + +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java index c021dafa48..4233d505a7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java @@ -14,12 +14,15 @@ package org.apache.hadoop.hive.ql.io.parquet.write; import java.io.IOException; +import java.util.Iterator; +import java.util.Map; import java.util.Properties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; @@ -33,6 +36,7 @@ import org.apache.hadoop.util.Progressable; import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.parquet.hadoop.ParquetRecordWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.ContextUtil; @@ -72,6 +76,27 @@ public ParquetRecordWriterWrapper( } } + public ParquetRecordWriterWrapper( + final ParquetOutputFormat realOutputFormat, + final JobConf jobConf, + final String name, + final Progressable progress) throws IOException { + this(realOutputFormat, jobConf, name, progress, retrieveTblProperties(jobConf)); + } + + private static Properties retrieveTblProperties(JobConf jobConf) { + Properties tblProperties = new Properties(); + Iterator> it = jobConf.iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + if (ParquetTableUtils.isParquetTblProperty(entry.getKey())) { + tblProperties.put(entry.getKey(), entry.getValue()); + } + } + return tblProperties; + } + + private void initializeSerProperties(JobContext job, Properties tableProperties) { String blockSize = tableProperties.getProperty(ParquetOutputFormat.BLOCK_SIZE); Configuration conf = ContextUtil.getConfiguration(job); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java index ec85b5df0f..a9086bafbf 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java @@ -44,16 +44,6 @@ public void testConstructorWithFormat() { new MapredParquetOutputFormat((ParquetOutputFormat) mock(ParquetOutputFormat.class)); } - @Test - public void testGetRecordWriterThrowsException() { - try { - new MapredParquetOutputFormat().getRecordWriter(null, null, null, null); - fail("should throw runtime exception."); - } catch (Exception e) { - assertEquals("Should never be used", e.getMessage()); - } - } - @SuppressWarnings("unchecked") @Test public void testGetHiveRecordWriter() throws IOException {