{\rtf1\ansi\ansicpg1252\cocoartf1671\cocoasubrtf500 {\fonttbl\f0\fmodern\fcharset0 Courier;} {\colortbl;\red255\green255\blue255;\red0\green0\blue0;} {\*\expandedcolortbl;;\cssrgb\c0\c0\c0;} \paperw11900\paperh16840\margl1440\margr1440\vieww10800\viewh8400\viewkind0 \deftab720 \pard\pardeftab720\sl280\partightenfactor0 \f0\fs24 \cf2 \expnd0\expndtw0\kerning0 \outl0\strokewidth0 \strokec2 diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java\ index b2abc5fbb3670893415354552239d67d072459ed..d547a2b73503027b00775412ce173a01bf93194c 100644\ --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java\ +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java\ @@ -27,6 +27,7 @@\ \ import org.apache.hadoop.fs.Path;\ import org.apache.hadoop.hive.conf.HiveConf;\ +import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper;\ import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;\ import org.apache.hadoop.hive.serde2.AbstractSerDe;\ import org.apache.hadoop.hive.serde2.SerDeException;\ @@ -117,10 +118,10 @@ public void write(WritableComparable key, HCatRecord value) throws IOExceptio\ value.remove(colToDel);\ \}\ \ - // The key given by user is ignored\ try \{\ - localWriter.write(NullWritable.get(),\ - localSerDe.serialize(value.getAll(), localObjectInspector));\ + // The key given by user is ignored - in case of Parquet we need to supply null\ + Object keyToWrite = localWriter instanceof ParquetRecordWriterWrapper ? null : NullWritable.get();\ + localWriter.write(keyToWrite, localSerDe.serialize(value.getAll(), localObjectInspector));\ \} catch (SerDeException e) \{\ throw new IOException("Failed to serialize object", e);\ \}\ diff --git hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java\ index 60af5c0bf397273fb820f0ee31e578745dbc200f..6d82ef99d7f9700658d7f8c4ff7502d6440b5265 100644\ --- hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java\ +++ hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java\ @@ -24,6 +24,10 @@\ import org.apache.hadoop.hive.conf.HiveConf;\ import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;\ import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;\ +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;\ +import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter;\ +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils;\ +import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;\ import org.apache.orc.OrcConf;\ import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;\ import org.apache.hadoop.hive.serde2.avro.AvroSerDe;\ @@ -38,6 +42,8 @@\ import java.util.Map;\ import java.util.Properties;\ \ +import com.google.common.collect.Maps;\ +\ /**\ * This class is a place to put all the code associated with\ * Special cases. If there is a corner case required to make\ @@ -120,6 +126,27 @@ public static void addSpecialCasesParametersToOutputJobProperties(\ \}\ \ \ + \} else if (ofclass == MapredParquetOutputFormat.class) \{\ + //Handle table properties\ + Properties tblProperties = new Properties();\ + Map tableProps = jobInfo.getTableInfo().getTable().getParameters();\ + for (String key : tableProps.keySet()) \{\ + if (ParquetTableUtils.isParquetProperty(key)) \{\ + tblProperties.put(key, tableProps.get(key));\ + \}\ + \}\ + \ + //Handle table schema\ + List colNames = jobInfo.getOutputSchema().getFieldNames();\ + List colTypes = new ArrayList();\ + for (HCatFieldSchema field : jobInfo.getOutputSchema().getFields())\{\ + colTypes.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getTypeString()));\ + \}\ + String parquetSchema = HiveSchemaConverter.convert(colNames, colTypes).toString();\ + jobProperties.put(DataWritableWriteSupport.PARQUET_HIVE_SCHEMA, parquetSchema);\ +\ + jobProperties.putAll(Maps.fromProperties(tblProperties));\ +\ \}\ \}\ \ diff --git hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderComplexSchema.java hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderComplexSchema.java\ index 4c686fec596d39d41d458bc3ea2753877bd9df98..ea9cdda31cb092edf1bc7d2fd4c4b1f45906e3f6 100644\ --- hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderComplexSchema.java\ +++ hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderComplexSchema.java\ @@ -77,9 +77,6 @@\ add("testMapNullKey");\ \}\});\ put(IOConstants.PARQUETFILE, new HashSet() \{\{\ - add("testSyntheticComplexSchema");\ - add("testTupleInBagInTupleInBag");\ - add("testMapWithComplexData");\ add("testMapNullKey");\ \}\});\ \}\};\ diff --git hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderEncryption.java hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderEncryption.java\ index ad11eab1b7e67541b56e90e4a85ba37b41a4db92..903578b2792df1c0602db3a64ecb6292084f94dd 100644\ --- hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderEncryption.java\ +++ hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderEncryption.java\ @@ -101,14 +101,7 @@\ private static List readRecords = new ArrayList();\ \ private static final Map> DISABLED_STORAGE_FORMATS =\ - new HashMap>() \{\{\ - put(IOConstants.PARQUETFILE, new HashSet() \{\{\ - add("testReadDataBasic");\ - add("testReadPartitionedBasic");\ - add("testProjectionsBasic");\ - add("testReadDataFromEncryptedHiveTable");\ - \}\});\ - \}\};\ + new HashMap>();\ \ private String storageFormat;\ \ diff --git hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerMulti.java hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerMulti.java\ index 918332ddfda58306707d326f8668b2c223110a29..40ea923858690cf4bd7f10896bb1083fd6e3b0ea 100644\ --- hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerMulti.java\ +++ hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerMulti.java\ @@ -24,7 +24,6 @@\ import java.util.ArrayList;\ import java.util.Collection;\ import java.util.HashMap;\ -import java.util.HashSet;\ import java.util.Map;\ import java.util.Set;\ \ @@ -33,7 +32,6 @@\ import org.apache.hadoop.hive.conf.HiveConf;\ import org.apache.hadoop.hive.ql.CommandNeedRetryException;\ import org.apache.hadoop.hive.ql.Driver;\ -import org.apache.hadoop.hive.ql.io.IOConstants;\ import org.apache.hadoop.hive.ql.io.StorageFormats;\ import org.apache.hadoop.hive.ql.session.SessionState;\ \ @@ -67,13 +65,7 @@\ private static Map> basicInputData;\ \ private static final Map> DISABLED_STORAGE_FORMATS =\ - new HashMap>() \{\{\ - put(IOConstants.PARQUETFILE, new HashSet() \{\{\ - add("testStoreBasicTable");\ - add("testStorePartitionedTable");\ - add("testStoreTableMulti");\ - \}\});\ - \}\};\ + new HashMap>();\ \ private final String storageFormat;\ \ diff --git hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatLoader.java hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatLoader.java\ index 6cd382145b55d6b85fc3366faeaba2aaef65ab04..082a6c2a57c920343c432f50c91bf0e3ff43509c 100644\ --- hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatLoader.java\ +++ hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatLoader.java\ @@ -33,53 +33,4 @@\ String getStorageFormat() \{\ return IOConstants.PARQUET;\ \}\ -\ - @Override\ - @Test\ - @Ignore("Temporarily disable until fixed")\ - public void testReadDataBasic() throws IOException \{\ - super.testReadDataBasic();\ - \}\ -\ - @Override\ - @Test\ - @Ignore("Temporarily disable until fixed")\ - public void testReadPartitionedBasic() throws IOException, CommandNeedRetryException \{\ - super.testReadPartitionedBasic();\ - \}\ -\ - @Override\ - @Test\ - @Ignore("Temporarily disable until fixed")\ - public void testProjectionsBasic() throws IOException \{\ - super.testProjectionsBasic();\ - \}\ -\ - /**\ - * Tests the failure case caused by HIVE-10752\ - * @throws Exception\ - */\ - @Override\ - @Test\ - @Ignore("Temporarily disable until fixed")\ - public void testColumnarStorePushdown2() throws Exception \{\ - super.testColumnarStorePushdown2();\ - \}\ -\ - @Override\ - @Test\ - @Ignore("Temporarily disable until fixed")\ - public void testReadMissingPartitionBasicNeg() throws IOException, CommandNeedRetryException \{\ - super.testReadMissingPartitionBasicNeg();\ - \}\ -\ - /**\ - * Test if we can read a date partitioned table\ - */\ - @Override\ - @Test\ - @Ignore("Temporarily disable until fixed")\ - public void testDatePartitionPushUp() throws Exception \{\ - super.testDatePartitionPushUp();\ - \}\ \}\ diff --git hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatStorer.java hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatStorer.java\ index 6dfdc04954dd0b110b1a7194e69468b5dc2f842e..1f67e21ecd543b9702577a11eff185aab444bf53 100644\ --- hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatStorer.java\ +++ hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatStorer.java\ @@ -18,12 +18,8 @@\ */\ package org.apache.hive.hcatalog.pig;\ \ -import java.io.IOException;\ \ -import org.apache.hadoop.hive.ql.CommandNeedRetryException;\ import org.apache.hadoop.hive.ql.io.IOConstants;\ -import org.junit.Ignore;\ -import org.junit.Test;\ import org.slf4j.Logger;\ import org.slf4j.LoggerFactory;\ \ @@ -34,167 +30,4 @@\ String getStorageFormat() \{\ return IOConstants.PARQUETFILE;\ \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testBagNStruct() throws IOException, CommandNeedRetryException \{\ - super.testBagNStruct();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testDateCharTypes() throws Exception \{\ - super.testDateCharTypes();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testDynamicPartitioningMultiPartColsInDataNoSpec() throws IOException,\ - CommandNeedRetryException \{\ - super.testDynamicPartitioningMultiPartColsInDataNoSpec();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testDynamicPartitioningMultiPartColsInDataPartialSpec() throws IOException,\ - CommandNeedRetryException \{\ - super.testDynamicPartitioningMultiPartColsInDataPartialSpec();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testMultiPartColsInData() throws Exception \{\ - super.testMultiPartColsInData();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testPartColsInData() throws IOException, CommandNeedRetryException \{\ - super.testPartColsInData();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testStoreFuncAllSimpleTypes() throws IOException, CommandNeedRetryException \{\ - super.testStoreFuncAllSimpleTypes();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testStoreFuncSimple() throws IOException, CommandNeedRetryException \{\ - super.testStoreFuncSimple();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testStoreInPartiitonedTbl() throws Exception \{\ - super.testStoreInPartiitonedTbl();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testStoreMultiTables() throws IOException, CommandNeedRetryException \{\ - super.testStoreMultiTables();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testStoreWithNoCtorArgs() throws IOException, CommandNeedRetryException \{\ - super.testStoreWithNoCtorArgs();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testStoreWithNoSchema() throws IOException, CommandNeedRetryException \{\ - super.testStoreWithNoSchema();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testWriteChar() throws Exception \{\ - super.testWriteChar();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testWriteDate() throws Exception \{\ - super.testWriteDate();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testWriteDate2() throws Exception \{\ - super.testWriteDate2();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testWriteDate3() throws Exception \{\ - super.testWriteDate3();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testWriteDecimal() throws Exception \{\ - super.testWriteDecimal();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testWriteDecimalX() throws Exception \{\ - super.testWriteDecimalX();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testWriteDecimalXY() throws Exception \{\ - super.testWriteDecimalXY();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testWriteSmallint() throws Exception \{\ - super.testWriteSmallint();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testWriteTimestamp() throws Exception \{\ - super.testWriteTimestamp();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testWriteTinyint() throws Exception \{\ - super.testWriteTinyint();\ - \}\ -\ - @Test\ - @Override\ - @Ignore("Temporarily disable until fixed")\ - public void testWriteVarchar() throws Exception \{\ - super.testWriteVarchar();\ - \}\ \}\ diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java\ index 379a9135d9c631b2f473976b00f3dc87f9fec0c4..dd8247c008dc57f8675ef09e0e5c7d09892b4315 100644\ --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java\ +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java\ @@ -68,6 +68,15 @@ public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws\ realOutputFormat.checkOutputSpecs(ShimLoader.getHadoopShims().getHCatShim().createJobContext(job, null));\ \}\ \ + /**\ + *\ + * @param ignored Unused parameter\ + * @param job JobConf - expecting mandatory parameter PARQUET_HIVE_SCHEMA\ + * @param name Path to write to\ + * @param progress Progress\ + * @return\ + * @throws IOException\ + */\ @Override\ public RecordWriter getRecordWriter(\ final FileSystem ignored,\ @@ -75,7 +84,7 @@ public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws\ final String name,\ final Progressable progress\ ) throws IOException \{\ - throw new RuntimeException("Should never be used");\ + return new ParquetRecordWriterWrapper(realOutputFormat, job, name, progress);\ \}\ \ /**\ diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetTableUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetTableUtils.java\ new file mode 100644\ index 0000000000000000000000000000000000000000..cb3b16c7b69f25d2b5490ade8b1d048d5d7076c3\ --- /dev/null\ +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetTableUtils.java\ @@ -0,0 +1,22 @@\ +/**\ + * Licensed under the Apache License, Version 2.0 (the "License");\ + * you may not use this file except in compliance with the License.\ + * You may obtain a copy of the License at\ + *\ + * http://www.apache.org/licenses/LICENSE-2.0\ + *\ + * Unless required by applicable law or agreed to in writing, software\ + * distributed under the License is distributed on an "AS IS" BASIS,\ + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\ + * See the License for the specific language governing permissions and\ + * limitations under the License.\ + */\ +package org.apache.hadoop.hive.ql.io.parquet.serde;\ +\ +public class ParquetTableUtils \{\ +\ + public static boolean isParquetProperty(String key) \{\ + return key.startsWith("parquet.");\ + \}\ +\ +\}\ \\ No newline at end of file\ diff --git ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java\ index c021dafa480e65d7c0c19a5a85988464112468cb..af9393ef6876a6e9adfae4376ea165654bc4d380 100644\ --- ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java\ +++ ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java\ @@ -14,12 +14,15 @@\ package org.apache.hadoop.hive.ql.io.parquet.write;\ \ import java.io.IOException;\ +import java.util.Iterator;\ +import java.util.Map;\ import java.util.Properties;\ \ import org.slf4j.Logger;\ import org.slf4j.LoggerFactory;\ import org.apache.hadoop.conf.Configuration;\ import org.apache.hadoop.fs.Path;\ +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils;\ import org.apache.hadoop.io.NullWritable;\ import org.apache.hadoop.io.Writable;\ import org.apache.hadoop.mapred.JobConf;\ @@ -72,6 +75,27 @@ public ParquetRecordWriterWrapper(\ \}\ \}\ \ + public ParquetRecordWriterWrapper(\ + final ParquetOutputFormat realOutputFormat,\ + final JobConf jobConf,\ + final String name,\ + final Progressable progress) throws IOException \{\ + this(realOutputFormat, jobConf, name, progress, getParquetProperties(jobConf));\ + \}\ +\ + private static Properties getParquetProperties(JobConf jobConf) \{\ + Properties tblProperties = new Properties();\ + Iterator> it = jobConf.iterator();\ + while (it.hasNext()) \{\ + Map.Entry entry = it.next();\ + if (ParquetTableUtils.isParquetProperty(entry.getKey())) \{\ + tblProperties.put(entry.getKey(), entry.getValue());\ + \}\ + \}\ + return tblProperties;\ + \}\ +\ +\ private void initializeSerProperties(JobContext job, Properties tableProperties) \{\ String blockSize = tableProperties.getProperty(ParquetOutputFormat.BLOCK_SIZE);\ Configuration conf = ContextUtil.getConfiguration(job);\ diff --git ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java\ index ec85b5df0f95cbd45b87259346ae9c1e5aa604a4..a9086bafbfb19938ea5c6aae18743eead83dd66d 100644\ --- ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java\ +++ ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java\ @@ -44,16 +44,6 @@ public void testConstructorWithFormat() \{\ new MapredParquetOutputFormat((ParquetOutputFormat) mock(ParquetOutputFormat.class));\ \}\ \ - @Test\ - public void testGetRecordWriterThrowsException() \{\ - try \{\ - new MapredParquetOutputFormat().getRecordWriter(null, null, null, null);\ - fail("should throw runtime exception.");\ - \} catch (Exception e) \{\ - assertEquals("Should never be used", e.getMessage());\ - \}\ - \}\ -\ @SuppressWarnings("unchecked")\ @Test\ public void testGetHiveRecordWriter() throws IOException \{\ }