diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java index 379a913..e2b8693 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java @@ -19,6 +19,10 @@ import java.util.List; import java.util.Properties; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -70,12 +74,71 @@ public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws @Override public RecordWriter getRecordWriter( - final FileSystem ignored, - final JobConf job, - final String name, - final Progressable progress - ) throws IOException { - throw new RuntimeException("Should never be used"); + final FileSystem ignored, + final JobConf job, + final String name, + final Progressable progress + ) throws IOException { + // Added to support Parquet format in HCatalog + // RecordWriter that does delayed instantiation of realWriter until first time write() is called. + return new FirstUseRecordWriter(job, name, realOutputFormat); + } + + private static class FirstUseRecordWriter implements RecordWriter { + private String savedName; + private ParquetOutputFormat savedRealOutputFormat; + private TaskAttemptContext taskContext; + + private org.apache.hadoop.mapreduce.RecordWriter realWriter; + + FirstUseRecordWriter(JobConf jc, String name, ParquetOutputFormat outputFormat) { + savedRealOutputFormat = outputFormat; + savedName = name; + + //create a TaskInputOutputContext + TaskAttemptID taskAttemptID = TaskAttemptID.forName(jc.get("mapred.task.id")); + if (taskAttemptID == null) { + taskAttemptID = new TaskAttemptID(); + } + taskContext = ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext(jc, taskAttemptID); + } + + @Override + public void write(final NullWritable key, final ParquetHiveRecord value) throws IOException { + try { + if (realWriter == null) { + realWriter = init(); + } + realWriter.write(null, value); //The ParquetRecordWriter ignores the key + } catch (final InterruptedException e) { + throw new RuntimeException("write failed", e); + } catch (IOException ioe) { + throw new RuntimeException("write failed", ioe); + } + } + + public void close(Reporter reporter) { + try { + if (realWriter != null) { + realWriter.close(taskContext); + } + } catch (final InterruptedException e) { + throw new RuntimeException("close failed", e); + } catch (IOException ioe) { + throw new RuntimeException("Close failed", ioe); + } + } + + private org.apache.hadoop.mapreduce.RecordWriter init() { + try { + return (org.apache.hadoop.mapreduce.RecordWriter) savedRealOutputFormat + .getRecordWriter(taskContext, new Path(savedName)); + } catch (final IOException e) { + throw new RuntimeException("init failed", e); + } catch (final InterruptedException e) { + throw new RuntimeException("init failed", e); + } + } } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java index a800991..7c6767f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java @@ -45,6 +45,9 @@ import org.apache.hadoop.io.Writable; import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter; +import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport; + /** * * A ParquetHiveSerDe for Hive (with the deprecated package mapred) @@ -116,6 +119,12 @@ public final void initialize(final Configuration conf, final Properties tbl) thr "name and column type differs. columnNames = " + columnNames + ", columnTypes = " + columnTypes); } + + // SerDe responsible for passing table properties to RecordWriter + if (conf != null) { + DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), conf); + } + // Create row related objects StructTypeInfo completeTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java index ec85b5d..4bf47e0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestMapredParquetOutputFormat.java @@ -25,8 +25,11 @@ import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport; import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper; import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.util.Progressable; +import org.junit.Ignore; import org.junit.Test; import org.apache.parquet.hadoop.ParquetOutputFormat; @@ -45,13 +48,11 @@ public void testConstructorWithFormat() { } @Test - public void testGetRecordWriterThrowsException() { - try { - new MapredParquetOutputFormat().getRecordWriter(null, null, null, null); - fail("should throw runtime exception."); - } catch (Exception e) { - assertEquals("Should never be used", e.getMessage()); - } + public void testGetRecordWriterNotThrowException() throws IOException { + JobConf jobConf = new JobConf(); + RecordWriter recordWriter = + new MapredParquetOutputFormat().getRecordWriter(null, jobConf, null, null); + assertNotNull(recordWriter); } @SuppressWarnings("unchecked")