diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatLoader.java index 6cd3821..a3db682 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatLoader.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatLoader.java @@ -18,12 +18,7 @@ */ package org.apache.hive.hcatalog.pig; -import java.io.IOException; - -import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.io.IOConstants; -import org.junit.Ignore; -import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,53 +28,4 @@ String getStorageFormat() { return IOConstants.PARQUET; } - - @Override - @Test - @Ignore("Temporarily disable until fixed") - public void testReadDataBasic() throws IOException { - super.testReadDataBasic(); - } - - @Override - @Test - @Ignore("Temporarily disable until fixed") - public void testReadPartitionedBasic() throws IOException, CommandNeedRetryException { - super.testReadPartitionedBasic(); - } - - @Override - @Test - @Ignore("Temporarily disable until fixed") - public void testProjectionsBasic() throws IOException { - super.testProjectionsBasic(); - } - - /** - * Tests the failure case caused by HIVE-10752 - * @throws Exception - */ - @Override - @Test - @Ignore("Temporarily disable until fixed") - public void testColumnarStorePushdown2() throws Exception { - super.testColumnarStorePushdown2(); - } - - @Override - @Test - @Ignore("Temporarily disable until fixed") - public void testReadMissingPartitionBasicNeg() throws IOException, CommandNeedRetryException { - super.testReadMissingPartitionBasicNeg(); - } - - /** - * Test if we can read a date partitioned table - */ - @Override - @Test - @Ignore("Temporarily disable until fixed") - public void testDatePartitionPushUp() throws Exception { - super.testDatePartitionPushUp(); - } } diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatStorer.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatStorer.java index 6dfdc04..1b97af1 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatStorer.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestParquetHCatStorer.java @@ -18,12 +18,7 @@ */ package org.apache.hive.hcatalog.pig; -import java.io.IOException; - -import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.io.IOConstants; -import org.junit.Ignore; -import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,167 +29,4 @@ String getStorageFormat() { return IOConstants.PARQUETFILE; } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testBagNStruct() throws IOException, CommandNeedRetryException { - super.testBagNStruct(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testDateCharTypes() throws Exception { - super.testDateCharTypes(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testDynamicPartitioningMultiPartColsInDataNoSpec() throws IOException, - CommandNeedRetryException { - super.testDynamicPartitioningMultiPartColsInDataNoSpec(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testDynamicPartitioningMultiPartColsInDataPartialSpec() throws IOException, - CommandNeedRetryException { - super.testDynamicPartitioningMultiPartColsInDataPartialSpec(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testMultiPartColsInData() throws Exception { - super.testMultiPartColsInData(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testPartColsInData() throws IOException, CommandNeedRetryException { - super.testPartColsInData(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testStoreFuncAllSimpleTypes() throws IOException, CommandNeedRetryException { - super.testStoreFuncAllSimpleTypes(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testStoreFuncSimple() throws IOException, CommandNeedRetryException { - super.testStoreFuncSimple(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testStoreInPartiitonedTbl() throws Exception { - super.testStoreInPartiitonedTbl(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testStoreMultiTables() throws IOException, CommandNeedRetryException { - super.testStoreMultiTables(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testStoreWithNoCtorArgs() throws IOException, CommandNeedRetryException { - super.testStoreWithNoCtorArgs(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testStoreWithNoSchema() throws IOException, CommandNeedRetryException { - super.testStoreWithNoSchema(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteChar() throws Exception { - super.testWriteChar(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteDate() throws Exception { - super.testWriteDate(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteDate2() throws Exception { - super.testWriteDate2(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteDate3() throws Exception { - super.testWriteDate3(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteDecimal() throws Exception { - super.testWriteDecimal(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteDecimalX() throws Exception { - super.testWriteDecimalX(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteDecimalXY() throws Exception { - super.testWriteDecimalXY(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteSmallint() throws Exception { - super.testWriteSmallint(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteTimestamp() throws Exception { - super.testWriteTimestamp(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteTinyint() throws Exception { - super.testWriteTinyint(); - } - - @Test - @Override - @Ignore("Temporarily disable until fixed") - public void testWriteVarchar() throws Exception { - super.testWriteVarchar(); - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java index 379a913..e2b8693 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java @@ -19,6 +19,10 @@ import java.util.List; import java.util.Properties; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -70,12 +74,71 @@ public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws @Override public RecordWriter getRecordWriter( - final FileSystem ignored, - final JobConf job, - final String name, - final Progressable progress - ) throws IOException { - throw new RuntimeException("Should never be used"); + final FileSystem ignored, + final JobConf job, + final String name, + final Progressable progress + ) throws IOException { + // Added to support Parquet format in HCatalog + // RecordWriter that does delayed instantiation of realWriter until first time write() is called. + return new FirstUseRecordWriter(job, name, realOutputFormat); + } + + private static class FirstUseRecordWriter implements RecordWriter { + private String savedName; + private ParquetOutputFormat savedRealOutputFormat; + private TaskAttemptContext taskContext; + + private org.apache.hadoop.mapreduce.RecordWriter realWriter; + + FirstUseRecordWriter(JobConf jc, String name, ParquetOutputFormat outputFormat) { + savedRealOutputFormat = outputFormat; + savedName = name; + + //create a TaskInputOutputContext + TaskAttemptID taskAttemptID = TaskAttemptID.forName(jc.get("mapred.task.id")); + if (taskAttemptID == null) { + taskAttemptID = new TaskAttemptID(); + } + taskContext = ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext(jc, taskAttemptID); + } + + @Override + public void write(final NullWritable key, final ParquetHiveRecord value) throws IOException { + try { + if (realWriter == null) { + realWriter = init(); + } + realWriter.write(null, value); //The ParquetRecordWriter ignores the key + } catch (final InterruptedException e) { + throw new RuntimeException("write failed", e); + } catch (IOException ioe) { + throw new RuntimeException("write failed", ioe); + } + } + + public void close(Reporter reporter) { + try { + if (realWriter != null) { + realWriter.close(taskContext); + } + } catch (final InterruptedException e) { + throw new RuntimeException("close failed", e); + } catch (IOException ioe) { + throw new RuntimeException("Close failed", ioe); + } + } + + private org.apache.hadoop.mapreduce.RecordWriter init() { + try { + return (org.apache.hadoop.mapreduce.RecordWriter) savedRealOutputFormat + .getRecordWriter(taskContext, new Path(savedName)); + } catch (final IOException e) { + throw new RuntimeException("init failed", e); + } catch (final InterruptedException e) { + throw new RuntimeException("init failed", e); + } + } } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java index a800991..7c6767f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java @@ -45,6 +45,9 @@ import org.apache.hadoop.io.Writable; import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; +import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport; + /** * * A ParquetHiveSerDe for Hive (with the deprecated package mapred) @@ -116,6 +119,12 @@ public final void initialize(final Configuration conf, final Properties tbl) thr "name and column type differs. columnNames = " + columnNames + ", columnTypes = " + columnTypes); } + + // SerDe responsible for passing table properties to RecordWriter + if (conf != null) { + DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), conf); + } + // Create row related objects StructTypeInfo completeTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java index ec85b5d..4bf47e0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java @@ -25,8 +25,11 @@ import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport; import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper; import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.util.Progressable; +import org.junit.Ignore; import org.junit.Test; import org.apache.parquet.hadoop.ParquetOutputFormat; @@ -45,13 +48,11 @@ public void testConstructorWithFormat() { } @Test - public void testGetRecordWriterThrowsException() { - try { - new MapredParquetOutputFormat().getRecordWriter(null, null, null, null); - fail("should throw runtime exception."); - } catch (Exception e) { - assertEquals("Should never be used", e.getMessage()); - } + public void testGetRecordWriterNotThrowException() throws IOException { + JobConf jobConf = new JobConf(); + RecordWriter recordWriter = + new MapredParquetOutputFormat().getRecordWriter(null, jobConf, null, null); + assertNotNull(recordWriter); } @SuppressWarnings("unchecked")