From 03e0c238147cd49746200fa68bca8d700dbd5c92 Mon Sep 17 00:00:00 2001 From: "David Z. Chen" Date: Tue, 17 Jun 2014 13:01:43 -0700 Subject: [PATCH] HIVE-4329: HCatalog should use getHiveRecordWriter. --- .../org/apache/hive/hcatalog/common/HCatUtil.java | 34 ++-- .../mapreduce/DefaultOutputFormatContainer.java | 27 ++- .../mapreduce/DefaultRecordWriterContainer.java | 51 +++-- .../DynamicPartitionFileRecordWriterContainer.java | 215 ++++++++++++--------- .../mapreduce/FileOutputFormatContainer.java | 60 +++--- .../mapreduce/FileRecordWriterContainer.java | 36 ++-- .../hcatalog/mapreduce/FosterStorageHandler.java | 12 +- .../hcatalog/mapreduce/HCatBaseOutputFormat.java | 36 ++-- .../hive/hcatalog/mapreduce/HCatMapRedUtil.java | 13 +- .../hive/hcatalog/mapreduce/HCatOutputFormat.java | 14 +- .../hive/hcatalog/mapreduce/InitializeInput.java | 4 +- .../hive/hcatalog/mapreduce/InternalUtil.java | 46 +++-- .../hcatalog/mapreduce/OutputFormatContainer.java | 8 +- .../hcatalog/mapreduce/RecordWriterContainer.java | 14 +- .../StaticPartitionFileRecordWriterContainer.java | 42 +++- .../hive/hcatalog/mapreduce/HCatMapReduceTest.java | 22 +-- .../mapreduce/TestHCatDynamicPartitioned.java | 1 + .../org/apache/hive/hcatalog/pig/PigHCatUtil.java | 2 +- .../apache/hadoop/hive/serde2/avro/AvroSerDe.java | 2 +- 19 files changed, 375 insertions(+), 264 deletions(-) diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java index 93a03ad..9b009be 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java @@ -60,6 +60,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.util.ReflectionUtils; + import org.apache.hive.hcatalog.data.Pair; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.data.schema.HCatSchema; @@ -71,6 +72,7 @@ import org.apache.hive.hcatalog.mapreduce.PartInfo; import org.apache.hive.hcatalog.mapreduce.StorerInfo; import org.apache.thrift.TException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -408,7 +410,7 @@ public static HiveStorageHandler getStorageHandler(Configuration conf, String serDe, String inputFormat, String outputFormat) - throws IOException { + throws IOException { if ((storageHandler == null) || (storageHandler.equals(FosterStorageHandler.class.getName()))) { try { @@ -479,18 +481,17 @@ public static HiveStorageHandler getStorageHandler(Configuration conf, @InterfaceAudience.Private @InterfaceStability.Evolving - public static void - configureOutputStorageHandler(HiveStorageHandler storageHandler, - Configuration conf, - OutputJobInfo outputJobInfo) { - //TODO replace IgnoreKeyTextOutputFormat with a - //HiveOutputFormatWrapper in StorageHandler + public static TableDesc configureOutputStorageHandler( + HiveStorageHandler storageHandler, Configuration conf, OutputJobInfo outputJobInfo) { + // TODO replace IgnoreKeyTextOutputFormat with a + // HiveOutputFormatWrapper in StorageHandler Properties props = outputJobInfo.getTableInfo().getStorerInfo().getProperties(); - props.put(serdeConstants.SERIALIZATION_LIB,storageHandler.getSerDeClass().getName()); + props.put(serdeConstants.SERIALIZATION_LIB, storageHandler.getSerDeClass().getName()); TableDesc tableDesc = new TableDesc(storageHandler.getInputFormatClass(), - IgnoreKeyTextOutputFormat.class,props); - if (tableDesc.getJobProperties() == null) + IgnoreKeyTextOutputFormat.class, props); + if (tableDesc.getJobProperties() == null) { tableDesc.setJobProperties(new HashMap()); + } for (Map.Entry el : conf) { tableDesc.getJobProperties().put(el.getKey(), el.getValue()); } @@ -502,19 +503,16 @@ public static HiveStorageHandler getStorageHandler(Configuration conf, Map jobProperties = new HashMap(); try { - tableDesc.getJobProperties().put( - HCatConstants.HCAT_KEY_OUTPUT_INFO, - HCatUtil.serialize(outputJobInfo)); - - storageHandler.configureOutputJobProperties(tableDesc, - jobProperties); + tableDesc.getJobProperties().put(HCatConstants.HCAT_KEY_OUTPUT_INFO, + HCatUtil.serialize(outputJobInfo)); + storageHandler.configureOutputJobProperties(tableDesc, jobProperties); Map tableJobProperties = tableDesc.getJobProperties(); if (tableJobProperties != null) { if (tableJobProperties.containsKey(HCatConstants.HCAT_KEY_OUTPUT_INFO)) { String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_OUTPUT_INFO); if (jobString != null) { - if (!jobProperties.containsKey(HCatConstants.HCAT_KEY_OUTPUT_INFO)) { + if (!jobProperties.containsKey(HCatConstants.HCAT_KEY_OUTPUT_INFO)) { jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, tableJobProperties.get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); } @@ -528,6 +526,8 @@ public static HiveStorageHandler getStorageHandler(Configuration conf, throw new IllegalStateException( "Failed to configure StorageHandler", e); } + + return tableDesc; } /** diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputFormatContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputFormatContainer.java index 3a07b0c..af26da5 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputFormatContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputFormatContainer.java @@ -19,19 +19,28 @@ package org.apache.hive.hcatalog.mapreduce; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; - +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; + import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.data.HCatRecord; import java.io.IOException; import java.text.NumberFormat; +import java.util.Properties; /** * Bare bones implementation of OutputFormatContainer. Does only the required @@ -47,7 +56,7 @@ NUMBER_FORMAT.setGroupingUsed(false); } - public DefaultOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat, Writable> of) { + public DefaultOutputFormatContainer(HiveOutputFormat of) { super(of); } @@ -63,14 +72,11 @@ static synchronized String getOutputName(int partition) { * @throws IOException */ @Override - public RecordWriter, HCatRecord> - getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { - String name = getOutputName(context.getTaskAttemptID().getTaskID().getId()); - return new DefaultRecordWriterContainer(context, - getBaseOutputFormat().getRecordWriter(null, new JobConf(context.getConfiguration()), name, InternalUtil.createReporter(context))); + public RecordWriter, HCatRecord> getRecordWriter( + TaskAttemptContext context) throws IOException, InterruptedException { + return new DefaultRecordWriterContainer(context, getBaseOutputFormat()); } - /** * Get the output committer for this output format. This is responsible * for ensuring the output is committed correctly. @@ -81,8 +87,9 @@ static synchronized String getOutputName(int partition) { */ @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) - throws IOException, InterruptedException { - return new DefaultOutputCommitterContainer(context, new JobConf(context.getConfiguration()).getOutputCommitter()); + throws IOException, InterruptedException { + return new DefaultOutputCommitterContainer(context, + new JobConf(context.getConfiguration()).getOutputCommitter()); } /** diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultRecordWriterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultRecordWriterContainer.java index 209d7bc..955edcd 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultRecordWriterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultRecordWriterContainer.java @@ -20,7 +20,13 @@ package org.apache.hive.hcatalog.mapreduce; import java.io.IOException; +import java.util.Properties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -28,7 +34,9 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; + import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.HCatRecord; @@ -45,36 +53,57 @@ /** * @param context current JobContext - * @param baseRecordWriter RecordWriter to contain * @throws IOException * @throws InterruptedException */ public DefaultRecordWriterContainer(TaskAttemptContext context, - org.apache.hadoop.mapred.RecordWriter, ? super Writable> baseRecordWriter) throws IOException, InterruptedException { - super(context, baseRecordWriter); - jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration()); - storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo()); + HiveOutputFormat baseOutputFormat) throws IOException { + super(context); + Configuration conf = context.getConfiguration(); + Properties tableProperties = new Properties(); + + jobInfo = HCatOutputFormat.getJobInfo(conf); + storageHandler = HCatUtil.getStorageHandler(conf, jobInfo.getTableInfo().getStorerInfo()); HCatOutputFormat.configureOutputStorageHandler(context); - serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), context.getConfiguration()); + serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), conf); hcatRecordOI = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()); try { - InternalUtil.initializeOutputSerDe(serDe, context.getConfiguration(), jobInfo); + InternalUtil.initializeOutputSerDe(serDe, tableProperties, context.getConfiguration(), + jobInfo); } catch (SerDeException e) { throw new IOException("Failed to initialize SerDe", e); } + + // Initialize RecordWriter. + Path parentDir = new Path(conf.get("mapred.work.output.dir")); + Path childPath = new Path(parentDir, + FileOutputFormat.getUniqueFile(context, "part", "")); + + boolean isCompressed = conf.getBoolean("mapred.output.compress", false); + Class valueClass = null; + try { + valueClass = (Class) + Class.forName(conf.get("mapred.output.value.class")); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + + FileSinkOperator.RecordWriter recordWriter = baseOutputFormat.getHiveRecordWriter( + new JobConf(conf), childPath, valueClass, isCompressed, tableProperties, + InternalUtil.createReporter(context)); + setBaseRecordWriter(recordWriter); } @Override - public void close(TaskAttemptContext context) throws IOException, - InterruptedException { - getBaseRecordWriter().close(InternalUtil.createReporter(context)); + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + getBaseRecordWriter().close(false); } @Override public void write(WritableComparable key, HCatRecord value) throws IOException, InterruptedException { try { - getBaseRecordWriter().write(null, serDe.serialize(value.getAll(), hcatRecordOI)); + getBaseRecordWriter().write(serDe.serialize(value.getAll(), hcatRecordOI)); } catch (SerDeException e) { throw new IOException("Failed to serialize object", e); } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java index 4df912a..33d88dd 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java @@ -24,24 +24,36 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; + import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.common.HCatUtil; @@ -55,7 +67,7 @@ private final List dynamicPartCols; private int maxDynamicPartitions; - private final Map, ? super Writable>> baseDynamicWriters; + private final Map baseDynamicWriters; private final Map baseDynamicSerDe; private final Map baseDynamicCommitters; private final Map dynamicContexts; @@ -63,15 +75,13 @@ private Map dynamicOutputJobInfo; /** - * @param baseWriter RecordWriter to contain * @param context current TaskAttemptContext * @throws IOException * @throws InterruptedException */ - public DynamicPartitionFileRecordWriterContainer( - RecordWriter, ? super Writable> baseWriter, - TaskAttemptContext context) throws IOException, InterruptedException { - super(baseWriter, context); + public DynamicPartitionFileRecordWriterContainer(TaskAttemptContext context) + throws IOException, InterruptedException { + super(context); maxDynamicPartitions = jobInfo.getMaxDynamicPartitions(); dynamicPartCols = jobInfo.getPosOfDynPartCols(); if (dynamicPartCols == null) { @@ -80,8 +90,7 @@ public DynamicPartitionFileRecordWriterContainer( } this.baseDynamicSerDe = new HashMap(); - this.baseDynamicWriters = - new HashMap, ? super Writable>>(); + this.baseDynamicWriters = new HashMap(); this.baseDynamicCommitters = new HashMap(); this.dynamicContexts = new HashMap(); this.dynamicObjectInspectors = new HashMap(); @@ -90,15 +99,12 @@ public DynamicPartitionFileRecordWriterContainer( @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { - Reporter reporter = InternalUtil.createReporter(context); - for (RecordWriter, ? super Writable> bwriter : baseDynamicWriters - .values()) { + for (FileSinkOperator.RecordWriter bwriter : baseDynamicWriters.values()) { // We are in RecordWriter.close() make sense that the context would be // TaskInputOutput. - bwriter.close(reporter); + bwriter.close(false); } - for (Map.Entry entry : baseDynamicCommitters - .entrySet()) { + for (Map.Entry entry : baseDynamicCommitters.entrySet()) { org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey()); OutputCommitter baseOutputCommitter = entry.getValue(); if (baseOutputCommitter.needsTaskCommit(currContext)) { @@ -118,92 +124,117 @@ protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOExceptio } String dynKey = dynamicPartValues.toString(); - if (!baseDynamicWriters.containsKey(dynKey)) { - if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)) { - throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, - "Number of dynamic partitions being created " - + "exceeds configured max allowable partitions[" + maxDynamicPartitions - + "], increase parameter [" + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname - + "] if needed."); - } + if (baseDynamicWriters.containsKey(dynKey)) { + return new LocalFileWriter(baseDynamicWriters.get(dynKey), dynamicObjectInspectors.get(dynKey), + baseDynamicSerDe.get(dynKey), dynamicOutputJobInfo.get(dynKey)); + } - org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = - HCatMapRedUtil.createTaskAttemptContext(context); - configureDynamicStorageHandler(currTaskContext, dynamicPartValues); - localJobInfo = HCatBaseOutputFormat.getJobInfo(currTaskContext.getConfiguration()); - - // Setup serDe. - SerDe currSerDe = - ReflectionUtils.newInstance(storageHandler.getSerDeClass(), currTaskContext.getJobConf()); - try { - InternalUtil.initializeOutputSerDe(currSerDe, currTaskContext.getConfiguration(), - localJobInfo); - } catch (SerDeException e) { - throw new IOException("Failed to initialize SerDe", e); - } + if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)) { + throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, + "Number of dynamic partitions being created " + + "exceeds configured max allowable partitions[" + maxDynamicPartitions + + "], increase parameter [" + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname + + "] if needed."); + } - // create base OutputFormat - org.apache.hadoop.mapred.OutputFormat baseOF = - ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), - currTaskContext.getJobConf()); - - // We are skipping calling checkOutputSpecs() for each partition - // As it can throw a FileAlreadyExistsException when more than one - // mapper is writing to a partition. - // See HCATALOG-490, also to avoid contacting the namenode for each new - // FileOutputFormat instance. - // In general this should be ok for most FileOutputFormat implementations - // but may become an issue for cases when the method is used to perform - // other setup tasks. - - // Get Output Committer - org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = - currTaskContext.getJobConf().getOutputCommitter(); - - // Create currJobContext the latest so it gets all the config changes - org.apache.hadoop.mapred.JobContext currJobContext = - HCatMapRedUtil.createJobContext(currTaskContext); - - // Set up job. - baseOutputCommitter.setupJob(currJobContext); - - // Recreate to refresh jobConf of currTask context. - currTaskContext = - HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(), - currTaskContext.getTaskAttemptID(), currTaskContext.getProgressible()); - - // Set temp location. - currTaskContext.getConfiguration().set( - "mapred.work.output.dir", - new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext) - .getWorkPath().toString()); - - // Set up task. - baseOutputCommitter.setupTask(currTaskContext); - - Path parentDir = new Path(currTaskContext.getConfiguration().get("mapred.work.output.dir")); - Path childPath = - new Path(parentDir, FileOutputFormat.getUniqueFile(currTaskContext, "part", "")); - - RecordWriter baseRecordWriter = - baseOF.getRecordWriter(parentDir.getFileSystem(currTaskContext.getConfiguration()), - currTaskContext.getJobConf(), childPath.toString(), - InternalUtil.createReporter(currTaskContext)); - - baseDynamicWriters.put(dynKey, baseRecordWriter); - baseDynamicSerDe.put(dynKey, currSerDe); - baseDynamicCommitters.put(dynKey, baseOutputCommitter); - dynamicContexts.put(dynKey, currTaskContext); - dynamicObjectInspectors.put(dynKey, - InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema())); - dynamicOutputJobInfo.put(dynKey, - HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey).getConfiguration())); + org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = + HCatMapRedUtil.createTaskAttemptContext(context); + configureDynamicStorageHandler(currTaskContext, dynamicPartValues); + localJobInfo = HCatBaseOutputFormat.getJobInfo(currTaskContext.getConfiguration()); + + // Setup serDe. + SerDe currSerDe = ReflectionUtils.newInstance( + storageHandler.getSerDeClass(), currTaskContext.getJobConf()); + Properties tableProperties = new Properties(); + try { + InternalUtil.initializeOutputSerDe(currSerDe, tableProperties, + currTaskContext.getConfiguration(), localJobInfo); + } catch (SerDeException e) { + throw new IOException("Failed to initialize SerDe", e); } + // create base OutputFormat + Class outputFormatClass = + HiveFileFormatUtils.getOutputFormatSubstitute(storageHandler.getOutputFormatClass(), + false); + HiveOutputFormat baseOF = ReflectionUtils.newInstance(outputFormatClass, + currTaskContext.getJobConf()); + + // We are skipping calling checkOutputSpecs() for each partition + // As it can throw a FileAlreadyExistsException when more than one + // mapper is writing to a partition. + // See HCATALOG-490, also to avoid contacting the namenode for each new + // FileOutputFormat instance. + // In general this should be ok for most FileOutputFormat implementations + // but may become an issue for cases when the method is used to perform + // other setup tasks. + + // Get Output Committer + org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = + currTaskContext.getJobConf().getOutputCommitter(); + + // Create currJobContext the latest so it gets all the config changes + org.apache.hadoop.mapred.JobContext currJobContext = + HCatMapRedUtil.createJobContext(currTaskContext); + + // Set up job. + baseOutputCommitter.setupJob(currJobContext); + + // Recreate to refresh jobConf of currTask context. + currTaskContext = + HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(), + currTaskContext.getTaskAttemptID(), currTaskContext.getProgressible()); + + // Set temp location. + currTaskContext.getConfiguration().set( + "mapred.work.output.dir", + new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext) + .getWorkPath().toString()); + + // Set up task. + baseOutputCommitter.setupTask(currTaskContext); + + Path parentDir = new Path(currTaskContext.getConfiguration().get("mapred.work.output.dir")); + Path childPath = new Path(parentDir, + FileOutputFormat.getUniqueFile(currTaskContext, "part", "")); + + FileSinkOperator.RecordWriter baseRecordWriter = getHiveRecordWriter( + baseOF, currTaskContext.getJobConf(), childPath, tableProperties, localJobInfo, + InternalUtil.createReporter(currTaskContext)); + + baseDynamicWriters.put(dynKey, baseRecordWriter); + baseDynamicSerDe.put(dynKey, currSerDe); + baseDynamicCommitters.put(dynKey, baseOutputCommitter); + dynamicContexts.put(dynKey, currTaskContext); + dynamicObjectInspectors.put(dynKey, + InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema())); + dynamicOutputJobInfo.put(dynKey, + HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey).getConfiguration())); + return new LocalFileWriter(baseDynamicWriters.get(dynKey), dynamicObjectInspectors.get(dynKey), baseDynamicSerDe.get(dynKey), dynamicOutputJobInfo.get(dynKey)); } + protected FileSinkOperator.RecordWriter getHiveRecordWriter( + HiveOutputFormat baseOutputFormat, JobConf jobConf, Path path, Properties tableProperties, + OutputJobInfo jobInfo, Progressable progressable) throws IOException { + Configuration conf = this.context.getConfiguration(); + boolean isCompressed = conf.getBoolean("mapred.output.compress", false); + Class valueClass = null; + try { + valueClass = (Class) + Class.forName(conf.get("mapred.output.value.class")); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + + System.err.println("\n--> DynamicPartitionFileRecordWriterContainer"); + tableProperties.list(System.err); + return baseOutputFormat.getHiveRecordWriter( + jobConf, path, valueClass, isCompressed, tableProperties, + progressable); + } + protected void configureDynamicStorageHandler(JobContext context, List dynamicPartVals) throws IOException { HCatOutputFormat.configureOutputStorageHandler(context, dynamicPartVals); diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java index 1a7595f..388d3ed 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java @@ -19,18 +19,23 @@ package org.apache.hive.hcatalog.mapreduce; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; @@ -43,17 +48,20 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.util.ReflectionUtils; + import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.HCatRecord; + import org.apache.thrift.TException; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Properties; /** * File-based storage (ie RCFile, Text, etc) implementation of OutputFormatContainer. @@ -64,50 +72,38 @@ /** * @param of base OutputFormat to contain */ - public FileOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat, ? super Writable> of) { + public FileOutputFormatContainer(HiveOutputFormat of) { super(of); } @Override - public RecordWriter, HCatRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { - //this needs to be manually set, under normal circumstances MR Task does this + public RecordWriter, HCatRecord> getRecordWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + // This needs to be manually set, under normal circumstances MR Task does this. setWorkOutputPath(context); - //Configure the output key and value classes. + // Configure the output key and value classes. // This is required for writing null as key for file based tables. - context.getConfiguration().set("mapred.output.key.class", - NullWritable.class.getName()); - String jobInfoString = context.getConfiguration().get( - HCatConstants.HCAT_KEY_OUTPUT_INFO); - OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil - .deserialize(jobInfoString); + context.getConfiguration().set("mapred.output.key.class", NullWritable.class.getName()); + String jobInfoString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); + OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil.deserialize(jobInfoString); StorerInfo storeInfo = jobInfo.getTableInfo().getStorerInfo(); HiveStorageHandler storageHandler = HCatUtil.getStorageHandler( - context.getConfiguration(), storeInfo); + context.getConfiguration(), storeInfo); Class serde = storageHandler.getSerDeClass(); - SerDe sd = (SerDe) ReflectionUtils.newInstance(serde, - context.getConfiguration()); - context.getConfiguration().set("mapred.output.value.class", - sd.getSerializedClass().getName()); + SerDe sd = (SerDe) ReflectionUtils.newInstance(serde, context.getConfiguration()); + context.getConfiguration().set("mapred.output.value.class", sd.getSerializedClass().getName()); + // Get RecordWriterContainer RecordWriter, HCatRecord> rw; - if (HCatBaseOutputFormat.getJobInfo(context.getConfiguration()).isDynamicPartitioningUsed()){ - // When Dynamic partitioning is used, the RecordWriter instance initialized here isn't used. Can use null. - // (That's because records can't be written until the values of the dynamic partitions are deduced. - // By that time, a new local instance of RecordWriter, with the correct output-path, will be constructed.) - rw = new DynamicPartitionFileRecordWriterContainer( - (org.apache.hadoop.mapred.RecordWriter)null, context); + if (HCatBaseOutputFormat.getJobInfo(context.getConfiguration()).isDynamicPartitioningUsed()) { + // When Dynamic partitioning is used, the RecordWriter instance initialized here isn't used. + // Can use null. (That's because records can't be written until the values of the dynamic + // partitions are deduced. By that time, a new local instance of RecordWriter, with the + // correct output-path, will be constructed.) + rw = new DynamicPartitionFileRecordWriterContainer(context); } else { - Path parentDir = new Path(context.getConfiguration().get("mapred.work.output.dir")); - Path childPath = new Path(parentDir,FileOutputFormat.getUniqueName(new JobConf(context.getConfiguration()), "part")); - - rw = new StaticPartitionFileRecordWriterContainer( - getBaseOutputFormat().getRecordWriter( - parentDir.getFileSystem(context.getConfiguration()), - new JobConf(context.getConfiguration()), - childPath.toString(), - InternalUtil.createReporter(context)), - context); + rw = new StaticPartitionFileRecordWriterContainer(context, getBaseOutputFormat()); } return rw; } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java index 2a883d6..aabdc1a 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java @@ -24,9 +24,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -35,13 +37,13 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; + import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.common.HCatUtil; @@ -56,6 +58,7 @@ protected final HiveStorageHandler storageHandler; protected final SerDe serDe; protected final ObjectInspector objectInspector; + protected final Properties tableProperties; private final List partColsToDel; @@ -68,20 +71,20 @@ * @throws IOException * @throws InterruptedException */ - public FileRecordWriterContainer( - RecordWriter, ? super Writable> baseWriter, - TaskAttemptContext context) throws IOException, InterruptedException { - super(context, baseWriter); + public FileRecordWriterContainer(TaskAttemptContext context) + throws IOException, InterruptedException { + super(context); this.context = context; - jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration()); - storageHandler = - HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo() - .getStorerInfo()); + jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration()); + storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), + jobInfo.getTableInfo().getStorerInfo()); serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), context.getConfiguration()); objectInspector = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()); + tableProperties = new Properties(); try { - InternalUtil.initializeOutputSerDe(serDe, context.getConfiguration(), jobInfo); + InternalUtil.initializeOutputSerDe(serDe, tableProperties, context.getConfiguration(), + jobInfo); } catch (SerDeException e) { throw new IOException("Failed to inialize SerDe", e); } @@ -108,7 +111,7 @@ abstract protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws I public void write(WritableComparable key, HCatRecord value) throws IOException, InterruptedException { LocalFileWriter localFileWriter = getLocalFileWriter(value); - RecordWriter localWriter = localFileWriter.getLocalWriter(); + FileSinkOperator.RecordWriter localWriter = localFileWriter.getLocalWriter(); ObjectInspector localObjectInspector = localFileWriter.getLocalObjectInspector(); SerDe localSerDe = localFileWriter.getLocalSerDe(); OutputJobInfo localJobInfo = localFileWriter.getLocalJobInfo(); @@ -119,28 +122,27 @@ public void write(WritableComparable key, HCatRecord value) throws IOExceptio // The key given by user is ignored try { - localWriter.write(NullWritable.get(), - localSerDe.serialize(value.getAll(), localObjectInspector)); + localWriter.write(localSerDe.serialize(value.getAll(), localObjectInspector)); } catch (SerDeException e) { throw new IOException("Failed to serialize object", e); } } class LocalFileWriter { - private RecordWriter localWriter; + private FileSinkOperator.RecordWriter localWriter; private ObjectInspector localObjectInspector; private SerDe localSerDe; private OutputJobInfo localJobInfo; - public LocalFileWriter(RecordWriter localWriter, ObjectInspector localObjectInspector, - SerDe localSerDe, OutputJobInfo localJobInfo) { + public LocalFileWriter(FileSinkOperator.RecordWriter localWriter, + ObjectInspector localObjectInspector, SerDe localSerDe, OutputJobInfo localJobInfo) { this.localWriter = localWriter; this.localObjectInspector = localObjectInspector; this.localSerDe = localSerDe; this.localJobInfo = localJobInfo; } - public RecordWriter getLocalWriter() { + public FileSinkOperator.RecordWriter getLocalWriter() { return localWriter; } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java index bfa8657..60f7846 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -71,17 +72,17 @@ public FosterStorageHandler(Class ifClass, @Override public Class getInputFormatClass() { - return ifClass; //To change body of overridden methods use File | Settings | File Templates. + return ifClass; } @Override public Class getOutputFormatClass() { - return ofClass; //To change body of overridden methods use File | Settings | File Templates. + return ofClass; } @Override public Class getSerDeClass() { - return serDeClass; //To change body of implemented methods use File | Settings | File Templates. + return serDeClass; } @Override @@ -186,10 +187,9 @@ public void configureTableJobProperties(TableDesc tableDesc, return; } - OutputFormatContainer getOutputFormatContainer( - org.apache.hadoop.mapred.OutputFormat outputFormat) { + /*OutputFormatContainer getOutputFormatContainer(HiveOutputFormat outputFormat) { return new FileOutputFormatContainer(outputFormat); - } + }*/ @Override public Configuration getConf() { diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseOutputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseOutputFormat.java index 4f7a74a..5a52412 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseOutputFormat.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseOutputFormat.java @@ -25,12 +25,16 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.util.ReflectionUtils; + import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.common.HCatException; @@ -60,8 +64,7 @@ public static HCatSchema getTableSchema(Configuration conf) throws IOException { * @throws IOException when output should not be attempted */ @Override - public void checkOutputSpecs(JobContext context - ) throws IOException, InterruptedException { + public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { getOutputFormat(context).checkOutputSpecs(context); } @@ -71,20 +74,24 @@ public void checkOutputSpecs(JobContext context * @return the output format instance * @throws IOException */ - protected OutputFormat, HCatRecord> getOutputFormat(JobContext context) - throws IOException { + protected OutputFormat, HCatRecord> getOutputFormat(JobContext context) + throws IOException { OutputJobInfo jobInfo = getJobInfo(context.getConfiguration()); - HiveStorageHandler storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), + HiveStorageHandler storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo()); // Always configure storage handler with jobproperties/jobconf before calling any methods on it configureOutputStorageHandler(context); + + // If the OutputFormat is not a HiveOutputFormat, find a substitute. + Class outputFormatClass = + HiveFileFormatUtils.getOutputFormatSubstitute(storageHandler.getOutputFormatClass(), + false); if (storageHandler instanceof FosterStorageHandler) { - return new FileOutputFormatContainer(ReflectionUtils.newInstance( - storageHandler.getOutputFormatClass(),context.getConfiguration())); - } - else { - return new DefaultOutputFormatContainer(ReflectionUtils.newInstance( - storageHandler.getOutputFormatClass(),context.getConfiguration())); + return new FileOutputFormatContainer( + ReflectionUtils.newInstance(outputFormatClass, context.getConfiguration())); + } else { + return new DefaultOutputFormatContainer( + ReflectionUtils.newInstance(outputFormatClass, context.getConfiguration())); } } @@ -146,13 +153,6 @@ static void configureOutputStorageHandler( partitionValues.put(dynamicPartKeys.get(i), dynamicPartVals.get(i)); } -// // re-home location, now that we know the rest of the partvals -// Table table = jobInfo.getTableInfo().getTable(); -// -// List partitionCols = new ArrayList(); -// for(FieldSchema schema : table.getPartitionKeys()) { -// partitionCols.add(schema.getName()); -// } jobInfo.setPartitionValues(partitionValues); } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtil.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtil.java index b651cb3..0cfeee1 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtil.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtil.java @@ -31,13 +31,16 @@ public class HCatMapRedUtil { - public static TaskAttemptContext createTaskAttemptContext(org.apache.hadoop.mapreduce.TaskAttemptContext context) { - return createTaskAttemptContext(new JobConf(context.getConfiguration()), - org.apache.hadoop.mapred.TaskAttemptID.forName(context.getTaskAttemptID().toString()), - Reporter.NULL); + public static TaskAttemptContext createTaskAttemptContext( + org.apache.hadoop.mapreduce.TaskAttemptContext context) { + String taskAttemptId = context.getTaskAttemptID().toString(); + return createTaskAttemptContext(new JobConf(context.getConfiguration()), + org.apache.hadoop.mapred.TaskAttemptID.forName(taskAttemptId), + Reporter.NULL); } - public static org.apache.hadoop.mapreduce.TaskAttemptContext createTaskAttemptContext(Configuration conf, org.apache.hadoop.mapreduce.TaskAttemptID id) { + public static org.apache.hadoop.mapreduce.TaskAttemptContext createTaskAttemptContext( + Configuration conf, org.apache.hadoop.mapreduce.TaskAttemptID id) { return ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext(conf,id); } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java index 6947398..7163228 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java @@ -31,11 +31,12 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; @@ -43,12 +44,14 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.security.Credentials; + import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.data.schema.HCatSchema; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -254,9 +257,8 @@ public static void setSchema(final Configuration conf, final HCatSchema schema) * @throws InterruptedException */ @Override - public RecordWriter, HCatRecord> - getRecordWriter(TaskAttemptContext context) - throws IOException, InterruptedException { + public RecordWriter, HCatRecord> getRecordWriter(TaskAttemptContext context) + throws IOException, InterruptedException { return getOutputFormat(context).getRecordWriter(context); } @@ -270,8 +272,8 @@ public static void setSchema(final Configuration conf, final HCatSchema schema) * @throws InterruptedException */ @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context - ) throws IOException, InterruptedException { + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { return getOutputFormat(context).getOutputCommitter(context); } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java index 1980ef5..1b1acdb 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java @@ -102,8 +102,8 @@ private static InputJobInfo getInputJobInfo( hiveConf = new HiveConf(HCatInputFormat.class); } client = HCatUtil.getHiveClient(hiveConf); - Table table = HCatUtil.getTable(client, inputJobInfo.getDatabaseName(), - inputJobInfo.getTableName()); + Table table = new Table(client.getTable(inputJobInfo.getDatabaseName(), + inputJobInfo.getTableName())); List partInfoList = new ArrayList(); diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InternalUtil.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InternalUtil.java index 9b97939..d4ac3ce 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InternalUtil.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InternalUtil.java @@ -24,7 +24,10 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -44,9 +47,11 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; + import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.data.schema.HCatSchema; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +77,6 @@ static StorerInfo extractStorerInfo(StorageDescriptor sd, Map pr hcatProperties.put(param.getKey(), param.getValue()); } - return new StorerInfo( sd.getInputFormat(), sd.getOutputFormat(), sd.getSerdeInfo().getSerializationLib(), properties.get(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE), @@ -139,41 +143,47 @@ private static ObjectInspector getObjectInspector(TypeInfo type) throws IOExcept } } - //TODO this has to find a better home, it's also hardcoded as default in hive would be nice + // TODO this has to find a better home, it's also hardcoded as default in hive would be nice // if the default was decided by the serde - static void initializeOutputSerDe(SerDe serDe, Configuration conf, OutputJobInfo jobInfo) - throws SerDeException { - SerDeUtils.initializeSerDe(serDe, conf, - getSerdeProperties(jobInfo.getTableInfo(), - jobInfo.getOutputSchema()), - null); + public static void initializeOutputSerDe(SerDe serDe, Properties tableProperties, + Configuration conf, OutputJobInfo jobInfo) throws SerDeException { + TableDesc tableDesc = Utilities.getTableDesc(new Table(jobInfo.getTableInfo().getTable())); + tableProperties.putAll(tableDesc.getProperties()); + SerDeUtils.initializeSerDe(serDe, conf, tableProperties, null); + + System.err.println("\n--> initializeOutputSerDe"); + tableProperties.list(System.err); } - static void initializeDeserializer(Deserializer deserializer, Configuration conf, - HCatTableInfo info, HCatSchema schema) throws SerDeException { - Properties props = getSerdeProperties(info, schema); + public static void initializeDeserializer(Deserializer deserializer, Configuration conf, + HCatTableInfo info, HCatSchema schema) throws SerDeException { + TableDesc tableDesc = Utilities.getTableDesc(new Table(info.getTable())); + Properties props = tableDesc.getProperties(); LOG.info("Initializing " + deserializer.getClass().getName() + " with properties " + props); SerDeUtils.initializeSerDe(deserializer, conf, props, null); + + System.err.println("\n--> initializeDeserializer"); + props.list(System.err); } - private static Properties getSerdeProperties(HCatTableInfo info, HCatSchema s) - throws SerDeException { + /*private static Properties getSerdeProperties(HCatTableInfo info, HCatSchema s) + throws SerDeException { Properties props = new Properties(); List fields = HCatUtil.getFieldSchemaList(s.getFields()); - props.setProperty(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS, + props.setProperty(serdeConstants.LIST_COLUMNS, MetaStoreUtils.getColumnNamesFromFieldSchema(fields)); - props.setProperty(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES, + props.setProperty(serdeConstants.LIST_COLUMN_TYPES, MetaStoreUtils.getColumnTypesFromFieldSchema(fields)); // setting these props to match LazySimpleSerde - props.setProperty(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT, "\\N"); - props.setProperty(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT, "1"); + props.setProperty(serdeConstants.SERIALIZATION_NULL_FORMAT, "\\N"); + props.setProperty(serdeConstants.SERIALIZATION_FORMAT, "1"); //add props from params set in table schema props.putAll(info.getStorerInfo().getProperties()); return props; - } + }*/ static Reporter createReporter(TaskAttemptContext context) { return new ProgressReporter(context); diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputFormatContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputFormatContainer.java index d83b003..302d18a 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputFormatContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputFormatContainer.java @@ -19,9 +19,11 @@ package org.apache.hive.hcatalog.mapreduce; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.OutputFormat; + import org.apache.hive.hcatalog.data.HCatRecord; /** @@ -34,19 +36,19 @@ * such as partitioning isn't supported. */ abstract class OutputFormatContainer extends OutputFormat, HCatRecord> { - private org.apache.hadoop.mapred.OutputFormat, ? super Writable> of; + private HiveOutputFormat of; /** * @param of OutputFormat this instance will contain */ - public OutputFormatContainer(org.apache.hadoop.mapred.OutputFormat, ? super Writable> of) { + public OutputFormatContainer(HiveOutputFormat of) { this.of = of; } /** * @return underlying OutputFormat */ - public org.apache.hadoop.mapred.OutputFormat getBaseOutputFormat() { + public HiveOutputFormat getBaseOutputFormat() { return of; } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/RecordWriterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/RecordWriterContainer.java index 5905b46..50c0f57 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/RecordWriterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/RecordWriterContainer.java @@ -24,29 +24,33 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hive.hcatalog.data.HCatRecord; /** * This class will contain an implementation of an RecordWriter. * See {@link OutputFormatContainer} for more information about containers. */ -abstract class RecordWriterContainer extends RecordWriter, HCatRecord> { +abstract class RecordWriterContainer extends RecordWriter, HCatRecord> { - private final org.apache.hadoop.mapred.RecordWriter, ? super Writable> baseRecordWriter; + private FileSinkOperator.RecordWriter baseRecordWriter; /** * @param context current JobContext * @param baseRecordWriter RecordWriter that this instance will contain */ - public RecordWriterContainer(TaskAttemptContext context, - org.apache.hadoop.mapred.RecordWriter, ? super Writable> baseRecordWriter) { + public RecordWriterContainer(TaskAttemptContext context) { + this.baseRecordWriter = null; + } + + protected void setBaseRecordWriter(FileSinkOperator.RecordWriter baseRecordWriter) { this.baseRecordWriter = baseRecordWriter; } /** * @return underlying RecordWriter */ - public org.apache.hadoop.mapred.RecordWriter getBaseRecordWriter() { + public FileSinkOperator.RecordWriter getBaseRecordWriter() { return baseRecordWriter; } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java index b3ea76e..b85cc1a 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java @@ -20,12 +20,18 @@ package org.apache.hive.hcatalog.mapreduce; import java.io.IOException; +import java.util.Properties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hive.hcatalog.mapreduce.FileRecordWriterContainer; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.data.HCatRecord; @@ -41,16 +47,38 @@ * @throws IOException * @throws InterruptedException */ - public StaticPartitionFileRecordWriterContainer( - RecordWriter, ? super Writable> baseWriter, - TaskAttemptContext context) throws IOException, InterruptedException { - super(baseWriter, context); + public StaticPartitionFileRecordWriterContainer(TaskAttemptContext context, + HiveOutputFormat baseOutputFormat) + throws IOException, InterruptedException { + super(context); + Configuration conf = context.getConfiguration(); + JobConf jobConf = new JobConf(conf); + Path parentDir = new Path(conf.get("mapred.work.output.dir")); + Path childPath = new Path(parentDir, + FileOutputFormat.getUniqueName(jobConf, "part")); + + boolean isCompressed = conf.getBoolean("mapred.output.compress", false); + Class valueClass = null; + try { + valueClass = (Class) + Class.forName(conf.get("mapred.output.value.class")); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + + System.err.println("\n--> FileOutputFormatContainer"); + tableProperties.list(System.err); + + FileSinkOperator.RecordWriter recordWriter = + baseOutputFormat.getHiveRecordWriter( + jobConf, childPath, valueClass, isCompressed, tableProperties, + InternalUtil.createReporter(context)); + setBaseRecordWriter(recordWriter); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { - Reporter reporter = InternalUtil.createReporter(context); - getBaseRecordWriter().close(reporter); + getBaseRecordWriter().close(false); } @Override diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java index ee57f3f..42e9977 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java @@ -66,7 +66,6 @@ import junit.framework.Assert; import org.junit.After; -import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; import org.junit.runner.RunWith; @@ -102,16 +101,16 @@ protected String inputFormatClass; protected String outputFormatClass; - /** - * List of SerDe classes that the HCatalog core tests will not be run against. - */ - public static final Set DISABLED_SERDES = ImmutableSet.of( - AvroSerDe.class.getName(), - ParquetHiveSerDe.class.getName()); - @Parameterized.Parameters public static Collection generateParameters() { - return StorageFormats.asParameters(); + List parameters = (List) StorageFormats.asParameters(); + + for (int i = 0; i < parameters.size(); i++) { + Object[] params = parameters.get(i); + System.err.println("[" + i + "] " + params[0]); + } + + return parameters; } /** @@ -119,6 +118,7 @@ */ public HCatMapReduceTest(String name, String serdeClass, String inputFormatClass, String outputFormatClass) throws Exception { + System.err.println("\n==> " + name); this.serdeClass = serdeClass; this.inputFormatClass = inputFormatClass; this.outputFormatClass = outputFormatClass; @@ -173,10 +173,6 @@ public void deleteTable() throws Exception { @Before public void createTable() throws Exception { - // Use Junit's Assume to skip running this fixture against any storage formats whose - // SerDe is in the disabled serdes list. - Assume.assumeTrue(!DISABLED_SERDES.contains(serdeClass)); - String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName; try { client.dropTable(databaseName, tableName); diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java index 0d87c6c..38a1fdc 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java @@ -119,6 +119,7 @@ public void testHCatDynamicPartitionedTableMultipleTask() throws Exception { protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask, String customDynamicPathPattern) throws Exception { generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); + assertEquals(NUM_RECORDS, writeRecords.size()); runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true, asSingleMapTask, customDynamicPathPattern); runMRRead(NUM_RECORDS); diff --git a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java index 7c9003e..fb93dfd 100644 --- a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java +++ b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java @@ -192,7 +192,7 @@ public Table getTable(String location, String hcatServerUri, String hcatServerPr HiveMetaStoreClient client = null; try { client = getHiveMetaClient(hcatServerUri, hcatServerPrincipal, PigHCatUtil.class); - table = HCatUtil.getTable(client, dbName, tableName); + table = new Table(client.getTable(dbName, tableName)); } catch (NoSuchObjectException nsoe) { throw new PigException("Table not found : " + nsoe.getMessage(), PIG_EXCEPTION_CODE); // prettier error messages to frontend } catch (Exception e) { diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java index 69545b0..a101f26 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java @@ -90,7 +90,7 @@ public void initialize(Configuration configuration, Properties properties) throw columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); List columnComments; - if (columnCommentProperty.isEmpty()) { + if (columnCommentProperty == null || columnCommentProperty.isEmpty()) { columnComments = new ArrayList(); } else { columnComments = Arrays.asList(columnCommentProperty.split(",")); -- 1.8.3.4 (Apple Git-47)