diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java index 2f624df..7c2708b 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java @@ -62,6 +62,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.util.ReflectionUtils; + import org.apache.hive.hcatalog.data.Pair; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.data.schema.HCatSchema; @@ -73,6 +74,7 @@ import org.apache.hive.hcatalog.mapreduce.PartInfo; import org.apache.hive.hcatalog.mapreduce.StorerInfo; import org.apache.thrift.TException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -396,7 +398,7 @@ public static HiveStorageHandler getStorageHandler(Configuration conf, String serDe, String inputFormat, String outputFormat) - throws IOException { + throws IOException { if ((storageHandler == null) || (storageHandler.equals(FosterStorageHandler.class.getName()))) { try { @@ -467,18 +469,17 @@ public static HiveStorageHandler getStorageHandler(Configuration conf, @InterfaceAudience.Private @InterfaceStability.Evolving - public static void - configureOutputStorageHandler(HiveStorageHandler storageHandler, - Configuration conf, - OutputJobInfo outputJobInfo) { - //TODO replace IgnoreKeyTextOutputFormat with a - //HiveOutputFormatWrapper in StorageHandler + public static TableDesc configureOutputStorageHandler( + HiveStorageHandler storageHandler, Configuration conf, OutputJobInfo outputJobInfo) { + // TODO replace IgnoreKeyTextOutputFormat with a + // HiveOutputFormatWrapper in StorageHandler Properties props = outputJobInfo.getTableInfo().getStorerInfo().getProperties(); - props.put(serdeConstants.SERIALIZATION_LIB,storageHandler.getSerDeClass().getName()); + props.put(serdeConstants.SERIALIZATION_LIB, storageHandler.getSerDeClass().getName()); TableDesc tableDesc = new TableDesc(storageHandler.getInputFormatClass(), - IgnoreKeyTextOutputFormat.class,props); - if (tableDesc.getJobProperties() == null) + IgnoreKeyTextOutputFormat.class, props); + if (tableDesc.getJobProperties() == null) { tableDesc.setJobProperties(new HashMap()); + } for (Map.Entry el : conf) { tableDesc.getJobProperties().put(el.getKey(), el.getValue()); } @@ -490,19 +491,16 @@ public static HiveStorageHandler getStorageHandler(Configuration conf, Map jobProperties = new HashMap(); try { - tableDesc.getJobProperties().put( - HCatConstants.HCAT_KEY_OUTPUT_INFO, - HCatUtil.serialize(outputJobInfo)); - - storageHandler.configureOutputJobProperties(tableDesc, - jobProperties); + tableDesc.getJobProperties().put(HCatConstants.HCAT_KEY_OUTPUT_INFO, + HCatUtil.serialize(outputJobInfo)); + storageHandler.configureOutputJobProperties(tableDesc, jobProperties); Map tableJobProperties = tableDesc.getJobProperties(); if (tableJobProperties != null) { if (tableJobProperties.containsKey(HCatConstants.HCAT_KEY_OUTPUT_INFO)) { String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_OUTPUT_INFO); if (jobString != null) { - if (!jobProperties.containsKey(HCatConstants.HCAT_KEY_OUTPUT_INFO)) { + if (!jobProperties.containsKey(HCatConstants.HCAT_KEY_OUTPUT_INFO)) { jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, tableJobProperties.get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); } @@ -516,6 +514,8 @@ public static HiveStorageHandler getStorageHandler(Configuration conf, throw new IllegalStateException( "Failed to configure StorageHandler", e); } + + return tableDesc; } /** diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputFormatContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputFormatContainer.java index 3a07b0c..af26da5 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputFormatContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultOutputFormatContainer.java @@ -19,19 +19,28 @@ package org.apache.hive.hcatalog.mapreduce; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; - +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; + import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.data.HCatRecord; import java.io.IOException; import java.text.NumberFormat; +import java.util.Properties; /** * Bare bones implementation of OutputFormatContainer. Does only the required @@ -47,7 +56,7 @@ NUMBER_FORMAT.setGroupingUsed(false); } - public DefaultOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat, Writable> of) { + public DefaultOutputFormatContainer(HiveOutputFormat of) { super(of); } @@ -63,14 +72,11 @@ static synchronized String getOutputName(int partition) { * @throws IOException */ @Override - public RecordWriter, HCatRecord> - getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { - String name = getOutputName(context.getTaskAttemptID().getTaskID().getId()); - return new DefaultRecordWriterContainer(context, - getBaseOutputFormat().getRecordWriter(null, new JobConf(context.getConfiguration()), name, InternalUtil.createReporter(context))); + public RecordWriter, HCatRecord> getRecordWriter( + TaskAttemptContext context) throws IOException, InterruptedException { + return new DefaultRecordWriterContainer(context, getBaseOutputFormat()); } - /** * Get the output committer for this output format. This is responsible * for ensuring the output is committed correctly. @@ -81,8 +87,9 @@ static synchronized String getOutputName(int partition) { */ @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) - throws IOException, InterruptedException { - return new DefaultOutputCommitterContainer(context, new JobConf(context.getConfiguration()).getOutputCommitter()); + throws IOException, InterruptedException { + return new DefaultOutputCommitterContainer(context, + new JobConf(context.getConfiguration()).getOutputCommitter()); } /** diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultRecordWriterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultRecordWriterContainer.java index 209d7bc..955edcd 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultRecordWriterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DefaultRecordWriterContainer.java @@ -20,7 +20,13 @@ package org.apache.hive.hcatalog.mapreduce; import java.io.IOException; +import java.util.Properties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -28,7 +34,9 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; + import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.HCatRecord; @@ -45,36 +53,57 @@ /** * @param context current JobContext - * @param baseRecordWriter RecordWriter to contain * @throws IOException * @throws InterruptedException */ public DefaultRecordWriterContainer(TaskAttemptContext context, - org.apache.hadoop.mapred.RecordWriter, ? super Writable> baseRecordWriter) throws IOException, InterruptedException { - super(context, baseRecordWriter); - jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration()); - storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo()); + HiveOutputFormat baseOutputFormat) throws IOException { + super(context); + Configuration conf = context.getConfiguration(); + Properties tableProperties = new Properties(); + + jobInfo = HCatOutputFormat.getJobInfo(conf); + storageHandler = HCatUtil.getStorageHandler(conf, jobInfo.getTableInfo().getStorerInfo()); HCatOutputFormat.configureOutputStorageHandler(context); - serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), context.getConfiguration()); + serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), conf); hcatRecordOI = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()); try { - InternalUtil.initializeOutputSerDe(serDe, context.getConfiguration(), jobInfo); + InternalUtil.initializeOutputSerDe(serDe, tableProperties, context.getConfiguration(), + jobInfo); } catch (SerDeException e) { throw new IOException("Failed to initialize SerDe", e); } + + // Initialize RecordWriter. + Path parentDir = new Path(conf.get("mapred.work.output.dir")); + Path childPath = new Path(parentDir, + FileOutputFormat.getUniqueFile(context, "part", "")); + + boolean isCompressed = conf.getBoolean("mapred.output.compress", false); + Class valueClass = null; + try { + valueClass = (Class) + Class.forName(conf.get("mapred.output.value.class")); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + + FileSinkOperator.RecordWriter recordWriter = baseOutputFormat.getHiveRecordWriter( + new JobConf(conf), childPath, valueClass, isCompressed, tableProperties, + InternalUtil.createReporter(context)); + setBaseRecordWriter(recordWriter); } @Override - public void close(TaskAttemptContext context) throws IOException, - InterruptedException { - getBaseRecordWriter().close(InternalUtil.createReporter(context)); + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + getBaseRecordWriter().close(false); } @Override public void write(WritableComparable key, HCatRecord value) throws IOException, InterruptedException { try { - getBaseRecordWriter().write(null, serDe.serialize(value.getAll(), hcatRecordOI)); + getBaseRecordWriter().write(serDe.serialize(value.getAll(), hcatRecordOI)); } catch (SerDeException e) { throw new IOException("Failed to serialize object", e); } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java index 4df912a..8ef258b 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java @@ -24,24 +24,35 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; + import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.common.HCatUtil; @@ -55,7 +66,7 @@ private final List dynamicPartCols; private int maxDynamicPartitions; - private final Map, ? super Writable>> baseDynamicWriters; + private final Map baseDynamicWriters; private final Map baseDynamicSerDe; private final Map baseDynamicCommitters; private final Map dynamicContexts; @@ -63,15 +74,13 @@ private Map dynamicOutputJobInfo; /** - * @param baseWriter RecordWriter to contain * @param context current TaskAttemptContext * @throws IOException * @throws InterruptedException */ - public DynamicPartitionFileRecordWriterContainer( - RecordWriter, ? super Writable> baseWriter, - TaskAttemptContext context) throws IOException, InterruptedException { - super(baseWriter, context); + public DynamicPartitionFileRecordWriterContainer(TaskAttemptContext context) + throws IOException, InterruptedException { + super(context); maxDynamicPartitions = jobInfo.getMaxDynamicPartitions(); dynamicPartCols = jobInfo.getPosOfDynPartCols(); if (dynamicPartCols == null) { @@ -80,8 +89,7 @@ public DynamicPartitionFileRecordWriterContainer( } this.baseDynamicSerDe = new HashMap(); - this.baseDynamicWriters = - new HashMap, ? super Writable>>(); + this.baseDynamicWriters = new HashMap(); this.baseDynamicCommitters = new HashMap(); this.dynamicContexts = new HashMap(); this.dynamicObjectInspectors = new HashMap(); @@ -90,15 +98,12 @@ public DynamicPartitionFileRecordWriterContainer( @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { - Reporter reporter = InternalUtil.createReporter(context); - for (RecordWriter, ? super Writable> bwriter : baseDynamicWriters - .values()) { + for (FileSinkOperator.RecordWriter bwriter : baseDynamicWriters.values()) { // We are in RecordWriter.close() make sense that the context would be // TaskInputOutput. - bwriter.close(reporter); + bwriter.close(false); } - for (Map.Entry entry : baseDynamicCommitters - .entrySet()) { + for (Map.Entry entry : baseDynamicCommitters.entrySet()) { org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey()); OutputCommitter baseOutputCommitter = entry.getValue(); if (baseOutputCommitter.needsTaskCommit(currContext)) { @@ -118,92 +123,114 @@ protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOExceptio } String dynKey = dynamicPartValues.toString(); - if (!baseDynamicWriters.containsKey(dynKey)) { - if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)) { - throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, - "Number of dynamic partitions being created " - + "exceeds configured max allowable partitions[" + maxDynamicPartitions - + "], increase parameter [" + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname - + "] if needed."); - } + if (baseDynamicWriters.containsKey(dynKey)) { + return new LocalFileWriter(baseDynamicWriters.get(dynKey), dynamicObjectInspectors.get(dynKey), + baseDynamicSerDe.get(dynKey), dynamicOutputJobInfo.get(dynKey)); + } - org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = - HCatMapRedUtil.createTaskAttemptContext(context); - configureDynamicStorageHandler(currTaskContext, dynamicPartValues); - localJobInfo = HCatBaseOutputFormat.getJobInfo(currTaskContext.getConfiguration()); - - // Setup serDe. - SerDe currSerDe = - ReflectionUtils.newInstance(storageHandler.getSerDeClass(), currTaskContext.getJobConf()); - try { - InternalUtil.initializeOutputSerDe(currSerDe, currTaskContext.getConfiguration(), - localJobInfo); - } catch (SerDeException e) { - throw new IOException("Failed to initialize SerDe", e); - } + if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)) { + throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, + "Number of dynamic partitions being created " + + "exceeds configured max allowable partitions[" + maxDynamicPartitions + + "], increase parameter [" + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname + + "] if needed."); + } - // create base OutputFormat - org.apache.hadoop.mapred.OutputFormat baseOF = - ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), - currTaskContext.getJobConf()); - - // We are skipping calling checkOutputSpecs() for each partition - // As it can throw a FileAlreadyExistsException when more than one - // mapper is writing to a partition. - // See HCATALOG-490, also to avoid contacting the namenode for each new - // FileOutputFormat instance. - // In general this should be ok for most FileOutputFormat implementations - // but may become an issue for cases when the method is used to perform - // other setup tasks. - - // Get Output Committer - org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = - currTaskContext.getJobConf().getOutputCommitter(); - - // Create currJobContext the latest so it gets all the config changes - org.apache.hadoop.mapred.JobContext currJobContext = - HCatMapRedUtil.createJobContext(currTaskContext); - - // Set up job. - baseOutputCommitter.setupJob(currJobContext); - - // Recreate to refresh jobConf of currTask context. - currTaskContext = - HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(), - currTaskContext.getTaskAttemptID(), currTaskContext.getProgressible()); - - // Set temp location. - currTaskContext.getConfiguration().set( - "mapred.work.output.dir", - new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext) - .getWorkPath().toString()); - - // Set up task. - baseOutputCommitter.setupTask(currTaskContext); - - Path parentDir = new Path(currTaskContext.getConfiguration().get("mapred.work.output.dir")); - Path childPath = - new Path(parentDir, FileOutputFormat.getUniqueFile(currTaskContext, "part", "")); - - RecordWriter baseRecordWriter = - baseOF.getRecordWriter(parentDir.getFileSystem(currTaskContext.getConfiguration()), - currTaskContext.getJobConf(), childPath.toString(), - InternalUtil.createReporter(currTaskContext)); - - baseDynamicWriters.put(dynKey, baseRecordWriter); - baseDynamicSerDe.put(dynKey, currSerDe); - baseDynamicCommitters.put(dynKey, baseOutputCommitter); - dynamicContexts.put(dynKey, currTaskContext); - dynamicObjectInspectors.put(dynKey, - InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema())); - dynamicOutputJobInfo.put(dynKey, - HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey).getConfiguration())); + org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = + HCatMapRedUtil.createTaskAttemptContext(context); + configureDynamicStorageHandler(currTaskContext, dynamicPartValues); + localJobInfo = HCatBaseOutputFormat.getJobInfo(currTaskContext.getConfiguration()); + + // Setup serDe. + SerDe currSerDe = ReflectionUtils.newInstance( + storageHandler.getSerDeClass(), currTaskContext.getJobConf()); + Properties tableProperties = new Properties(); + try { + InternalUtil.initializeOutputSerDe(currSerDe, tableProperties, + currTaskContext.getConfiguration(), localJobInfo); + } catch (SerDeException e) { + throw new IOException("Failed to initialize SerDe", e); } + // create base OutputFormat + Table table = new Table(localJobInfo.getTableInfo().getTable()); + Class outputFormatClass = table.getOutputFormatClass(); + HiveOutputFormat baseOF = ReflectionUtils.newInstance(outputFormatClass, + currTaskContext.getJobConf()); + + // We are skipping calling checkOutputSpecs() for each partition + // As it can throw a FileAlreadyExistsException when more than one + // mapper is writing to a partition. + // See HCATALOG-490, also to avoid contacting the namenode for each new + // FileOutputFormat instance. + // In general this should be ok for most FileOutputFormat implementations + // but may become an issue for cases when the method is used to perform + // other setup tasks. + + // Get Output Committer + org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = + currTaskContext.getJobConf().getOutputCommitter(); + + // Create currJobContext the latest so it gets all the config changes + org.apache.hadoop.mapred.JobContext currJobContext = + HCatMapRedUtil.createJobContext(currTaskContext); + + // Set up job. + baseOutputCommitter.setupJob(currJobContext); + + // Recreate to refresh jobConf of currTask context. + currTaskContext = + HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(), + currTaskContext.getTaskAttemptID(), currTaskContext.getProgressible()); + + // Set temp location. + currTaskContext.getConfiguration().set( + "mapred.work.output.dir", + new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext) + .getWorkPath().toString()); + + // Set up task. + baseOutputCommitter.setupTask(currTaskContext); + + Path parentDir = new Path(currTaskContext.getConfiguration().get("mapred.work.output.dir")); + Path childPath = new Path(parentDir, + FileOutputFormat.getUniqueFile(currTaskContext, "part", "")); + + FileSinkOperator.RecordWriter baseRecordWriter = getHiveRecordWriter( + baseOF, currTaskContext.getJobConf(), childPath, tableProperties, localJobInfo, + InternalUtil.createReporter(currTaskContext)); + + baseDynamicWriters.put(dynKey, baseRecordWriter); + baseDynamicSerDe.put(dynKey, currSerDe); + baseDynamicCommitters.put(dynKey, baseOutputCommitter); + dynamicContexts.put(dynKey, currTaskContext); + dynamicObjectInspectors.put(dynKey, + InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema())); + dynamicOutputJobInfo.put(dynKey, + HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey).getConfiguration())); + return new LocalFileWriter(baseDynamicWriters.get(dynKey), dynamicObjectInspectors.get(dynKey), baseDynamicSerDe.get(dynKey), dynamicOutputJobInfo.get(dynKey)); } + protected FileSinkOperator.RecordWriter getHiveRecordWriter( + HiveOutputFormat baseOutputFormat, JobConf jobConf, Path path, Properties tableProperties, + OutputJobInfo jobInfo, Progressable progressable) throws IOException { + Configuration conf = this.context.getConfiguration(); + boolean isCompressed = conf.getBoolean("mapred.output.compress", false); + Class valueClass = null; + try { + valueClass = (Class) + Class.forName(conf.get("mapred.output.value.class")); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + + return baseOutputFormat.getHiveRecordWriter( + jobConf, path, valueClass, isCompressed, tableProperties, + progressable); + } + protected void configureDynamicStorageHandler(JobContext context, List dynamicPartVals) throws IOException { HCatOutputFormat.configureOutputStorageHandler(context, dynamicPartVals); diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java index 1a7595f..388d3ed 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java @@ -19,18 +19,23 @@ package org.apache.hive.hcatalog.mapreduce; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; @@ -43,17 +48,20 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.util.ReflectionUtils; + import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.HCatRecord; + import org.apache.thrift.TException; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Properties; /** * File-based storage (ie RCFile, Text, etc) implementation of OutputFormatContainer. @@ -64,50 +72,38 @@ /** * @param of base OutputFormat to contain */ - public FileOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat, ? super Writable> of) { + public FileOutputFormatContainer(HiveOutputFormat of) { super(of); } @Override - public RecordWriter, HCatRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { - //this needs to be manually set, under normal circumstances MR Task does this + public RecordWriter, HCatRecord> getRecordWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + // This needs to be manually set, under normal circumstances MR Task does this. setWorkOutputPath(context); - //Configure the output key and value classes. + // Configure the output key and value classes. // This is required for writing null as key for file based tables. - context.getConfiguration().set("mapred.output.key.class", - NullWritable.class.getName()); - String jobInfoString = context.getConfiguration().get( - HCatConstants.HCAT_KEY_OUTPUT_INFO); - OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil - .deserialize(jobInfoString); + context.getConfiguration().set("mapred.output.key.class", NullWritable.class.getName()); + String jobInfoString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); + OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil.deserialize(jobInfoString); StorerInfo storeInfo = jobInfo.getTableInfo().getStorerInfo(); HiveStorageHandler storageHandler = HCatUtil.getStorageHandler( - context.getConfiguration(), storeInfo); + context.getConfiguration(), storeInfo); Class serde = storageHandler.getSerDeClass(); - SerDe sd = (SerDe) ReflectionUtils.newInstance(serde, - context.getConfiguration()); - context.getConfiguration().set("mapred.output.value.class", - sd.getSerializedClass().getName()); + SerDe sd = (SerDe) ReflectionUtils.newInstance(serde, context.getConfiguration()); + context.getConfiguration().set("mapred.output.value.class", sd.getSerializedClass().getName()); + // Get RecordWriterContainer RecordWriter, HCatRecord> rw; - if (HCatBaseOutputFormat.getJobInfo(context.getConfiguration()).isDynamicPartitioningUsed()){ - // When Dynamic partitioning is used, the RecordWriter instance initialized here isn't used. Can use null. - // (That's because records can't be written until the values of the dynamic partitions are deduced. - // By that time, a new local instance of RecordWriter, with the correct output-path, will be constructed.) - rw = new DynamicPartitionFileRecordWriterContainer( - (org.apache.hadoop.mapred.RecordWriter)null, context); + if (HCatBaseOutputFormat.getJobInfo(context.getConfiguration()).isDynamicPartitioningUsed()) { + // When Dynamic partitioning is used, the RecordWriter instance initialized here isn't used. + // Can use null. (That's because records can't be written until the values of the dynamic + // partitions are deduced. By that time, a new local instance of RecordWriter, with the + // correct output-path, will be constructed.) + rw = new DynamicPartitionFileRecordWriterContainer(context); } else { - Path parentDir = new Path(context.getConfiguration().get("mapred.work.output.dir")); - Path childPath = new Path(parentDir,FileOutputFormat.getUniqueName(new JobConf(context.getConfiguration()), "part")); - - rw = new StaticPartitionFileRecordWriterContainer( - getBaseOutputFormat().getRecordWriter( - parentDir.getFileSystem(context.getConfiguration()), - new JobConf(context.getConfiguration()), - childPath.toString(), - InternalUtil.createReporter(context)), - context); + rw = new StaticPartitionFileRecordWriterContainer(context, getBaseOutputFormat()); } return rw; } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java index 2a883d6..aabdc1a 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java @@ -24,9 +24,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -35,13 +37,13 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; + import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.common.HCatUtil; @@ -56,6 +58,7 @@ protected final HiveStorageHandler storageHandler; protected final SerDe serDe; protected final ObjectInspector objectInspector; + protected final Properties tableProperties; private final List partColsToDel; @@ -68,20 +71,20 @@ * @throws IOException * @throws InterruptedException */ - public FileRecordWriterContainer( - RecordWriter, ? super Writable> baseWriter, - TaskAttemptContext context) throws IOException, InterruptedException { - super(context, baseWriter); + public FileRecordWriterContainer(TaskAttemptContext context) + throws IOException, InterruptedException { + super(context); this.context = context; - jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration()); - storageHandler = - HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo() - .getStorerInfo()); + jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration()); + storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), + jobInfo.getTableInfo().getStorerInfo()); serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), context.getConfiguration()); objectInspector = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()); + tableProperties = new Properties(); try { - InternalUtil.initializeOutputSerDe(serDe, context.getConfiguration(), jobInfo); + InternalUtil.initializeOutputSerDe(serDe, tableProperties, context.getConfiguration(), + jobInfo); } catch (SerDeException e) { throw new IOException("Failed to inialize SerDe", e); } @@ -108,7 +111,7 @@ abstract protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws I public void write(WritableComparable key, HCatRecord value) throws IOException, InterruptedException { LocalFileWriter localFileWriter = getLocalFileWriter(value); - RecordWriter localWriter = localFileWriter.getLocalWriter(); + FileSinkOperator.RecordWriter localWriter = localFileWriter.getLocalWriter(); ObjectInspector localObjectInspector = localFileWriter.getLocalObjectInspector(); SerDe localSerDe = localFileWriter.getLocalSerDe(); OutputJobInfo localJobInfo = localFileWriter.getLocalJobInfo(); @@ -119,28 +122,27 @@ public void write(WritableComparable key, HCatRecord value) throws IOExceptio // The key given by user is ignored try { - localWriter.write(NullWritable.get(), - localSerDe.serialize(value.getAll(), localObjectInspector)); + localWriter.write(localSerDe.serialize(value.getAll(), localObjectInspector)); } catch (SerDeException e) { throw new IOException("Failed to serialize object", e); } } class LocalFileWriter { - private RecordWriter localWriter; + private FileSinkOperator.RecordWriter localWriter; private ObjectInspector localObjectInspector; private SerDe localSerDe; private OutputJobInfo localJobInfo; - public LocalFileWriter(RecordWriter localWriter, ObjectInspector localObjectInspector, - SerDe localSerDe, OutputJobInfo localJobInfo) { + public LocalFileWriter(FileSinkOperator.RecordWriter localWriter, + ObjectInspector localObjectInspector, SerDe localSerDe, OutputJobInfo localJobInfo) { this.localWriter = localWriter; this.localObjectInspector = localObjectInspector; this.localSerDe = localSerDe; this.localJobInfo = localJobInfo; } - public RecordWriter getLocalWriter() { + public FileSinkOperator.RecordWriter getLocalWriter() { return localWriter; } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java index bfa8657..60f7846 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -71,17 +72,17 @@ public FosterStorageHandler(Class ifClass, @Override public Class getInputFormatClass() { - return ifClass; //To change body of overridden methods use File | Settings | File Templates. + return ifClass; } @Override public Class getOutputFormatClass() { - return ofClass; //To change body of overridden methods use File | Settings | File Templates. + return ofClass; } @Override public Class getSerDeClass() { - return serDeClass; //To change body of implemented methods use File | Settings | File Templates. + return serDeClass; } @Override @@ -186,10 +187,9 @@ public void configureTableJobProperties(TableDesc tableDesc, return; } - OutputFormatContainer getOutputFormatContainer( - org.apache.hadoop.mapred.OutputFormat outputFormat) { + /*OutputFormatContainer getOutputFormatContainer(HiveOutputFormat outputFormat) { return new FileOutputFormatContainer(outputFormat); - } + }*/ @Override public Configuration getConf() { diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseOutputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseOutputFormat.java index 4f7a74a..8603fb9 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseOutputFormat.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseOutputFormat.java @@ -25,12 +25,16 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.util.ReflectionUtils; + import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.common.HCatException; @@ -60,8 +64,7 @@ public static HCatSchema getTableSchema(Configuration conf) throws IOException { * @throws IOException when output should not be attempted */ @Override - public void checkOutputSpecs(JobContext context - ) throws IOException, InterruptedException { + public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { getOutputFormat(context).checkOutputSpecs(context); } @@ -71,20 +74,23 @@ public void checkOutputSpecs(JobContext context * @return the output format instance * @throws IOException */ - protected OutputFormat, HCatRecord> getOutputFormat(JobContext context) - throws IOException { + protected OutputFormat, HCatRecord> getOutputFormat(JobContext context) + throws IOException { OutputJobInfo jobInfo = getJobInfo(context.getConfiguration()); - HiveStorageHandler storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), + HiveStorageHandler storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo()); // Always configure storage handler with jobproperties/jobconf before calling any methods on it configureOutputStorageHandler(context); + + // If the OutputFormat is not a HiveOutputFormat, find a substitute. + Table table = new Table(jobInfo.getTableInfo().getTable()); + Class outputFormatClass = table.getOutputFormatClass(); if (storageHandler instanceof FosterStorageHandler) { - return new FileOutputFormatContainer(ReflectionUtils.newInstance( - storageHandler.getOutputFormatClass(),context.getConfiguration())); - } - else { - return new DefaultOutputFormatContainer(ReflectionUtils.newInstance( - storageHandler.getOutputFormatClass(),context.getConfiguration())); + return new FileOutputFormatContainer( + ReflectionUtils.newInstance(outputFormatClass, context.getConfiguration())); + } else { + return new DefaultOutputFormatContainer( + ReflectionUtils.newInstance(outputFormatClass, context.getConfiguration())); } } @@ -146,13 +152,6 @@ static void configureOutputStorageHandler( partitionValues.put(dynamicPartKeys.get(i), dynamicPartVals.get(i)); } -// // re-home location, now that we know the rest of the partvals -// Table table = jobInfo.getTableInfo().getTable(); -// -// List partitionCols = new ArrayList(); -// for(FieldSchema schema : table.getPartitionKeys()) { -// partitionCols.add(schema.getName()); -// } jobInfo.setPartitionValues(partitionValues); } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtil.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtil.java index b651cb3..0cfeee1 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtil.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatMapRedUtil.java @@ -31,13 +31,16 @@ public class HCatMapRedUtil { - public static TaskAttemptContext createTaskAttemptContext(org.apache.hadoop.mapreduce.TaskAttemptContext context) { - return createTaskAttemptContext(new JobConf(context.getConfiguration()), - org.apache.hadoop.mapred.TaskAttemptID.forName(context.getTaskAttemptID().toString()), - Reporter.NULL); + public static TaskAttemptContext createTaskAttemptContext( + org.apache.hadoop.mapreduce.TaskAttemptContext context) { + String taskAttemptId = context.getTaskAttemptID().toString(); + return createTaskAttemptContext(new JobConf(context.getConfiguration()), + org.apache.hadoop.mapred.TaskAttemptID.forName(taskAttemptId), + Reporter.NULL); } - public static org.apache.hadoop.mapreduce.TaskAttemptContext createTaskAttemptContext(Configuration conf, org.apache.hadoop.mapreduce.TaskAttemptID id) { + public static org.apache.hadoop.mapreduce.TaskAttemptContext createTaskAttemptContext( + Configuration conf, org.apache.hadoop.mapreduce.TaskAttemptID id) { return ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext(conf,id); } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java index 6947398..e77cc0c 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java @@ -31,11 +31,12 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; @@ -43,17 +44,21 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.security.Credentials; + import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.data.schema.HCatSchema; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** The OutputFormat to use to write data to HCatalog. The key value is ignored and - * should be given as null. The value is the HCatRecord to write.*/ +/** + * The OutputFormat to use to write data to HCatalog. The key value is ignored and + * should be given as null. The value is the HCatRecord to write + */ @InterfaceAudience.Public @InterfaceStability.Evolving public class HCatOutputFormat extends HCatBaseOutputFormat { @@ -63,6 +68,115 @@ private static int maxDynamicPartitions; private static boolean harRequested; + /* + * Builds the index list from the table. + */ + private static List buildIndexList(HiveMetaStoreClient client, + OutputJobInfo outputJobInfo) throws HCatException { + List indexList = null; + try { + indexList = client.listIndexNames(outputJobInfo.getDatabaseName(), + outputJobInfo.getTableName(), Short.MAX_VALUE); + for (String indexName : indexList) { + Index index = client.getIndex(outputJobInfo.getDatabaseName(), + outputJobInfo.getTableName(), indexName); + if (!index.isDeferredRebuild()) { + throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, + "Store into a table with an automatic index from Pig/Mapreduce is not supported"); + } + } + } catch (Exception e) { + if (e instanceof HCatException) { + throw (HCatException) e; + } else { + throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); + } + } + return indexList; + } + + /* + * Obtains and validates the StorageDescriptor from the Table. + */ + private static StorageDescriptor getStorageDescriptor(Table table) + throws HCatException { + StorageDescriptor sd = table.getTTable().getSd(); + if (sd.isCompressed()) { + throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, + "Store into a compressed partition from Pig/Mapreduce is not supported"); + } + if (sd.getBucketCols() != null && !sd.getBucketCols().isEmpty()) { + throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, + "Store into a partition with bucket definition from Pig/Mapreduce is not supported"); + } + if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) { + throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, + "Store into a partition with sorted column definition from Pig/Mapreduce is not " + + "supported"); + } + return sd; + } + + /* + * Sets the output partition values. + */ + private static void setPartitionValues(OutputJobInfo outputJobInfo, Table table, + Configuration conf) throws HCatException { + // Handle non-partitioned table. + if (table.getTTable().getPartitionKeysSize() == 0) { + if ((outputJobInfo.getPartitionValues() != null) + && (!outputJobInfo.getPartitionValues().isEmpty())) { + // Attempt made to save partition values in non-partitioned table - throw error. + throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, + "Partition values specified for non-partitioned table"); + } + outputJobInfo.setPartitionValues(new HashMap()); + return; + } + + // Handle partitioned table, we expect partition values convert user specified map to have + // lower case key names + Map valueMap = new HashMap(); + if (outputJobInfo.getPartitionValues() != null) { + for (Map.Entry entry : outputJobInfo.getPartitionValues().entrySet()) { + valueMap.put(entry.getKey().toLowerCase(), entry.getValue()); + } + } + + // Dynamic partition usecase - partition values were null, or not all were specified + // need to figure out which keys are not specified. + if ((outputJobInfo.getPartitionValues() == null) + || (outputJobInfo.getPartitionValues().size() < table.getTTable().getPartitionKeysSize())) { + List dynamicPartitioningKeys = new ArrayList(); + boolean firstItem = true; + for (FieldSchema fs : table.getPartitionKeys()) { + if (!valueMap.containsKey(fs.getName().toLowerCase())) { + dynamicPartitioningKeys.add(fs.getName().toLowerCase()); + } + } + + if (valueMap.size() + dynamicPartitioningKeys.size() != table.getTTable().getPartitionKeysSize()) { + // If this isn't equal, then bogus key values have been inserted, error out. + throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, + "Invalid partition keys specified"); + } + + outputJobInfo.setDynamicPartitioningKeys(dynamicPartitioningKeys); + String dynHash; + if ((dynHash = conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)) == null) { + dynHash = String.valueOf(Math.random()); + } + conf.set(HCatConstants.HCAT_DYNAMIC_PTN_JOBID, dynHash); + + // If custom pattern is set in case of dynamic partitioning, configure custom path + String customPattern = conf.get(HCatConstants.HCAT_DYNAMIC_CUSTOM_PATTERN); + if (customPattern != null) { + HCatFileUtil.setCustomPath(customPattern, outputJobInfo); + } + } + outputJobInfo.setPartitionValues(valueMap); + } + /** * @see org.apache.hive.hcatalog.mapreduce.HCatOutputFormat#setOutput(org.apache.hadoop.conf.Configuration, Credentials, OutputJobInfo) */ @@ -81,37 +195,19 @@ public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOExce */ @SuppressWarnings("unchecked") public static void setOutput(Configuration conf, Credentials credentials, - OutputJobInfo outputJobInfo) throws IOException { + OutputJobInfo outputJobInfo) throws IOException { HiveMetaStoreClient client = null; - try { - HiveConf hiveConf = HCatUtil.getHiveConf(conf); client = HCatUtil.getHiveClient(hiveConf); Table table = HCatUtil.getTable(client, outputJobInfo.getDatabaseName(), - outputJobInfo.getTableName()); - - List indexList = client.listIndexNames(outputJobInfo.getDatabaseName(), outputJobInfo.getTableName(), Short.MAX_VALUE); - - for (String indexName : indexList) { - Index index = client.getIndex(outputJobInfo.getDatabaseName(), outputJobInfo.getTableName(), indexName); - if (!index.isDeferredRebuild()) { - throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a table with an automatic index from Pig/Mapreduce is not supported"); - } - } - StorageDescriptor sd = table.getTTable().getSd(); - - if (sd.isCompressed()) { - throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a compressed partition from Pig/Mapreduce is not supported"); - } + outputJobInfo.getTableName()); - if (sd.getBucketCols() != null && !sd.getBucketCols().isEmpty()) { - throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a partition with bucket definition from Pig/Mapreduce is not supported"); - } + // Build the index list from the table. + List indexList = buildIndexList(client, outputJobInfo); - if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) { - throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a partition with sorted column definition from Pig/Mapreduce is not supported"); - } + // Obtain and validate the StorageDescriptor. + StorageDescriptor sd = getStorageDescriptor(table); // Set up a common id hash for this job, so that when we create any temporary directory // later on, it is guaranteed to be unique. @@ -121,73 +217,19 @@ public static void setOutput(Configuration conf, Credentials credentials, } conf.set(HCatConstants.HCAT_OUTPUT_ID_HASH,idHash); - if (table.getTTable().getPartitionKeysSize() == 0) { - if ((outputJobInfo.getPartitionValues() != null) && (!outputJobInfo.getPartitionValues().isEmpty())) { - // attempt made to save partition values in non-partitioned table - throw error. - throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, - "Partition values specified for non-partitioned table"); - } - // non-partitioned table - outputJobInfo.setPartitionValues(new HashMap()); - - } else { - // partitioned table, we expect partition values - // convert user specified map to have lower case key names - Map valueMap = new HashMap(); - if (outputJobInfo.getPartitionValues() != null) { - for (Map.Entry entry : outputJobInfo.getPartitionValues().entrySet()) { - valueMap.put(entry.getKey().toLowerCase(), entry.getValue()); - } - } - - if ((outputJobInfo.getPartitionValues() == null) - || (outputJobInfo.getPartitionValues().size() < table.getTTable().getPartitionKeysSize())) { - // dynamic partition usecase - partition values were null, or not all were specified - // need to figure out which keys are not specified. - List dynamicPartitioningKeys = new ArrayList(); - boolean firstItem = true; - for (FieldSchema fs : table.getPartitionKeys()) { - if (!valueMap.containsKey(fs.getName().toLowerCase())) { - dynamicPartitioningKeys.add(fs.getName().toLowerCase()); - } - } - - if (valueMap.size() + dynamicPartitioningKeys.size() != table.getTTable().getPartitionKeysSize()) { - // If this isn't equal, then bogus key values have been inserted, error out. - throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, "Invalid partition keys specified"); - } - - outputJobInfo.setDynamicPartitioningKeys(dynamicPartitioningKeys); - String dynHash; - if ((dynHash = conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)) == null) { - dynHash = String.valueOf(Math.random()); - } - conf.set(HCatConstants.HCAT_DYNAMIC_PTN_JOBID, dynHash); - - // if custom pattern is set in case of dynamic partitioning, configure custom path - String customPattern = conf.get(HCatConstants.HCAT_DYNAMIC_CUSTOM_PATTERN); - if (customPattern != null) { - HCatFileUtil.setCustomPath(customPattern, outputJobInfo); - } - } - - outputJobInfo.setPartitionValues(valueMap); - } + // Set partition values. + setPartitionValues(outputJobInfo, table, conf); // To get around hbase failure on single node, see BUG-4383 conf.set("dfs.client.read.shortcircuit", "false"); - HCatSchema tableSchema = HCatUtil.extractSchema(table); - StorerInfo storerInfo = - InternalUtil.extractStorerInfo(table.getTTable().getSd(), table.getParameters()); - - List partitionCols = new ArrayList(); - for (FieldSchema schema : table.getPartitionKeys()) { - partitionCols.add(schema.getName()); - } + // Create HiveStorageHandler. + HCatSchema tableSchema = HCatUtil.extractSchema(table); + StorerInfo storerInfo = InternalUtil.extractStorerInfo( + table.getTTable().getSd(), table.getParameters()); HiveStorageHandler storageHandler = HCatUtil.getStorageHandler(conf, storerInfo); - //Serialize the output info into the configuration + // Serialize the output info into the configuration outputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable())); outputJobInfo.setOutputSchema(tableSchema); harRequested = getHarRequested(hiveConf); @@ -195,21 +237,20 @@ public static void setOutput(Configuration conf, Credentials credentials, maxDynamicPartitions = getMaxDynamicPartitions(hiveConf); outputJobInfo.setMaximumDynamicPartitions(maxDynamicPartitions); + // Configure HiveStorageHandler. HCatUtil.configureOutputStorageHandler(storageHandler, conf, outputJobInfo); - Path tblPath = new Path(table.getTTable().getSd().getLocation()); - /* Set the umask in conf such that files/dirs get created with table-dir - * permissions. Following three assumptions are made: - * 1. Actual files/dirs creation is done by RecordWriter of underlying - * output format. It is assumed that they use default permissions while creation. - * 2. Default Permissions = FsPermission.getDefault() = 777. - * 3. UMask is honored by underlying filesystem. - */ - + // Set the umask in conf such that files/dirs get created with table-dir + // permissions. Following three assumptions are made: + // 1. Actual files/dirs creation is done by RecordWriter of underlying + // output format. It is assumed that they use default permissions while creation. + // 2. Default Permissions = FsPermission.getDefault() = 777. + // 3. UMask is honored by underlying filesystem. FsPermission.setUMask(conf, FsPermission.getDefault().applyUMask( - tblPath.getFileSystem(conf).getFileStatus(tblPath).getPermission())); + tblPath.getFileSystem(conf).getFileStatus(tblPath).getPermission())); + // If security is enabled, handle security credentials. if (Security.getInstance().isSecurityEnabled()) { Security.getInstance().handleSecurity(credentials, outputJobInfo, client, conf, harRequested); } @@ -238,7 +279,8 @@ public static void setSchema(final Job job, final HCatSchema schema) throws IOEx * @param schema the schema for the data * @throws IOException */ - public static void setSchema(final Configuration conf, final HCatSchema schema) throws IOException { + public static void setSchema(final Configuration conf, final HCatSchema schema) + throws IOException { OutputJobInfo jobInfo = getJobInfo(conf); Map partMap = jobInfo.getPartitionValues(); setPartDetails(jobInfo, schema, partMap); @@ -246,7 +288,7 @@ public static void setSchema(final Configuration conf, final HCatSchema schema) } /** - * Get the record writer for the job. This uses the StorageHandler's default + * Get the record writer for the job. This uses the StorageHandler's default * OutputFormat to get the record writer. * @param context the information about the current task * @return a RecordWriter to write the output for the job @@ -254,13 +296,11 @@ public static void setSchema(final Configuration conf, final HCatSchema schema) * @throws InterruptedException */ @Override - public RecordWriter, HCatRecord> - getRecordWriter(TaskAttemptContext context) - throws IOException, InterruptedException { + public RecordWriter, HCatRecord> getRecordWriter( + TaskAttemptContext context) throws IOException, InterruptedException { return getOutputFormat(context).getRecordWriter(context); } - /** * Get the output committer for this output format. This is responsible * for ensuring the output is committed correctly. @@ -270,8 +310,8 @@ public static void setSchema(final Configuration conf, final HCatSchema schema) * @throws InterruptedException */ @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context - ) throws IOException, InterruptedException { + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { return getOutputFormat(context).getOutputCommitter(context); } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java index 1980ef5..1b1acdb 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java @@ -102,8 +102,8 @@ private static InputJobInfo getInputJobInfo( hiveConf = new HiveConf(HCatInputFormat.class); } client = HCatUtil.getHiveClient(hiveConf); - Table table = HCatUtil.getTable(client, inputJobInfo.getDatabaseName(), - inputJobInfo.getTableName()); + Table table = new Table(client.getTable(inputJobInfo.getDatabaseName(), + inputJobInfo.getTableName())); List partInfoList = new ArrayList(); diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InternalUtil.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InternalUtil.java index 9b97939..dd98949 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InternalUtil.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InternalUtil.java @@ -24,7 +24,10 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -44,9 +47,11 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; + import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.data.schema.HCatSchema; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +77,6 @@ static StorerInfo extractStorerInfo(StorageDescriptor sd, Map pr hcatProperties.put(param.getKey(), param.getValue()); } - return new StorerInfo( sd.getInputFormat(), sd.getOutputFormat(), sd.getSerdeInfo().getSerializationLib(), properties.get(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE), @@ -139,40 +143,37 @@ private static ObjectInspector getObjectInspector(TypeInfo type) throws IOExcept } } - //TODO this has to find a better home, it's also hardcoded as default in hive would be nice + // TODO this has to find a better home, it's also hardcoded as default in hive would be nice // if the default was decided by the serde - static void initializeOutputSerDe(SerDe serDe, Configuration conf, OutputJobInfo jobInfo) - throws SerDeException { - SerDeUtils.initializeSerDe(serDe, conf, - getSerdeProperties(jobInfo.getTableInfo(), - jobInfo.getOutputSchema()), - null); + public static void initializeOutputSerDe(SerDe serDe, Properties tableProperties, + Configuration conf, OutputJobInfo jobInfo) throws SerDeException { + TableDesc tableDesc = Utilities.getTableDesc(new Table(jobInfo.getTableInfo().getTable())); + tableProperties.putAll(tableDesc.getProperties()); + + // column types must be separated by ',' and not ':' + List fields = HCatUtil.getFieldSchemaList(jobInfo.getOutputSchema().getFields()); + tableProperties.setProperty(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS, + MetaStoreUtils.getColumnNamesFromFieldSchema(fields)); + tableProperties.setProperty(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES, + MetaStoreUtils.getColumnTypesFromFieldSchema(fields)); + + SerDeUtils.initializeSerDe(serDe, conf, tableProperties, null); } - static void initializeDeserializer(Deserializer deserializer, Configuration conf, - HCatTableInfo info, HCatSchema schema) throws SerDeException { - Properties props = getSerdeProperties(info, schema); - LOG.info("Initializing " + deserializer.getClass().getName() + " with properties " + props); - SerDeUtils.initializeSerDe(deserializer, conf, props, null); - } + public static void initializeDeserializer(Deserializer deserializer, Configuration conf, + HCatTableInfo info, HCatSchema schema) throws SerDeException { + TableDesc tableDesc = Utilities.getTableDesc(new Table(info.getTable())); + Properties props = tableDesc.getProperties(); - private static Properties getSerdeProperties(HCatTableInfo info, HCatSchema s) - throws SerDeException { - Properties props = new Properties(); - List fields = HCatUtil.getFieldSchemaList(s.getFields()); + // column types must be separated by ',' and not ':' + List fields = HCatUtil.getFieldSchemaList(schema.getFields()); props.setProperty(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS, - MetaStoreUtils.getColumnNamesFromFieldSchema(fields)); + MetaStoreUtils.getColumnNamesFromFieldSchema(fields)); props.setProperty(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES, - MetaStoreUtils.getColumnTypesFromFieldSchema(fields)); - - // setting these props to match LazySimpleSerde - props.setProperty(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT, "\\N"); - props.setProperty(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT, "1"); - - //add props from params set in table schema - props.putAll(info.getStorerInfo().getProperties()); + MetaStoreUtils.getColumnTypesFromFieldSchema(fields)); - return props; + LOG.info("Initializing " + deserializer.getClass().getName() + " with properties " + props); + SerDeUtils.initializeSerDe(deserializer, conf, props, null); } static Reporter createReporter(TaskAttemptContext context) { diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputFormatContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputFormatContainer.java index d83b003..302d18a 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputFormatContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputFormatContainer.java @@ -19,9 +19,11 @@ package org.apache.hive.hcatalog.mapreduce; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.OutputFormat; + import org.apache.hive.hcatalog.data.HCatRecord; /** @@ -34,19 +36,19 @@ * such as partitioning isn't supported. */ abstract class OutputFormatContainer extends OutputFormat, HCatRecord> { - private org.apache.hadoop.mapred.OutputFormat, ? super Writable> of; + private HiveOutputFormat of; /** * @param of OutputFormat this instance will contain */ - public OutputFormatContainer(org.apache.hadoop.mapred.OutputFormat, ? super Writable> of) { + public OutputFormatContainer(HiveOutputFormat of) { this.of = of; } /** * @return underlying OutputFormat */ - public org.apache.hadoop.mapred.OutputFormat getBaseOutputFormat() { + public HiveOutputFormat getBaseOutputFormat() { return of; } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/RecordWriterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/RecordWriterContainer.java index 5905b46..50c0f57 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/RecordWriterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/RecordWriterContainer.java @@ -24,29 +24,33 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hive.hcatalog.data.HCatRecord; /** * This class will contain an implementation of an RecordWriter. * See {@link OutputFormatContainer} for more information about containers. */ -abstract class RecordWriterContainer extends RecordWriter, HCatRecord> { +abstract class RecordWriterContainer extends RecordWriter, HCatRecord> { - private final org.apache.hadoop.mapred.RecordWriter, ? super Writable> baseRecordWriter; + private FileSinkOperator.RecordWriter baseRecordWriter; /** * @param context current JobContext * @param baseRecordWriter RecordWriter that this instance will contain */ - public RecordWriterContainer(TaskAttemptContext context, - org.apache.hadoop.mapred.RecordWriter, ? super Writable> baseRecordWriter) { + public RecordWriterContainer(TaskAttemptContext context) { + this.baseRecordWriter = null; + } + + protected void setBaseRecordWriter(FileSinkOperator.RecordWriter baseRecordWriter) { this.baseRecordWriter = baseRecordWriter; } /** * @return underlying RecordWriter */ - public org.apache.hadoop.mapred.RecordWriter getBaseRecordWriter() { + public FileSinkOperator.RecordWriter getBaseRecordWriter() { return baseRecordWriter; } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java index b3ea76e..afd2cfa 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java @@ -20,12 +20,18 @@ package org.apache.hive.hcatalog.mapreduce; import java.io.IOException; +import java.util.Properties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hive.hcatalog.mapreduce.FileRecordWriterContainer; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.data.HCatRecord; @@ -41,16 +47,35 @@ * @throws IOException * @throws InterruptedException */ - public StaticPartitionFileRecordWriterContainer( - RecordWriter, ? super Writable> baseWriter, - TaskAttemptContext context) throws IOException, InterruptedException { - super(baseWriter, context); + public StaticPartitionFileRecordWriterContainer(TaskAttemptContext context, + HiveOutputFormat baseOutputFormat) + throws IOException, InterruptedException { + super(context); + Configuration conf = context.getConfiguration(); + JobConf jobConf = new JobConf(conf); + Path parentDir = new Path(conf.get("mapred.work.output.dir")); + Path childPath = new Path(parentDir, + FileOutputFormat.getUniqueName(jobConf, "part")); + + boolean isCompressed = conf.getBoolean("mapred.output.compress", false); + Class valueClass = null; + try { + valueClass = (Class) + Class.forName(conf.get("mapred.output.value.class")); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + + FileSinkOperator.RecordWriter recordWriter = + baseOutputFormat.getHiveRecordWriter( + jobConf, childPath, valueClass, isCompressed, tableProperties, + InternalUtil.createReporter(context)); + setBaseRecordWriter(recordWriter); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { - Reporter reporter = InternalUtil.createReporter(context); - getBaseRecordWriter().close(reporter); + getBaseRecordWriter().close(false); } @Override diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestUtil.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestUtil.java new file mode 100644 index 0000000..86db070 --- /dev/null +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestUtil.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.common; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +/** + * Test utilities for selectively disabling specific test methods for given storage formats. + */ +public class TestUtil { + private static final Map> SAMPLE_DISABLED_TESTS_MAP = + new HashMap>() {{ + put("test", new HashSet() {{ + add("testShouldSkip"); + }}); + }}; + + /** + * Determine whether the caller test method is in a set of disabled test methods for a given + * storage format. + * + * @param storageFormat The name of the storage format used in a STORED AS clause. + * @param disabledTestsMap Map of storage format name to set of test method names that indicate + * which test methods should not run against the given storage format. + * @return True if the caller test method should be skipped for the given storage format. + */ + public static boolean shouldSkip(String storageFormat, Map> disabledTestsMap) { + final StackTraceElement[] elements = Thread.currentThread().getStackTrace(); + // The "bottom" of the call stack is at the front of the array. The elements are as follows: + // [0] getStackTrace() + // [1] shouldSkip() + // [2] caller test method + String methodName = elements[2].getMethodName(); + if (!disabledTestsMap.containsKey(storageFormat)) { + return false; + } + + Set disabledMethods = disabledTestsMap.get(storageFormat); + return disabledMethods.contains(methodName); + } + + @Test + public void testShouldSkip() { + assertTrue(TestUtil.shouldSkip("test", SAMPLE_DISABLED_TESTS_MAP)); + } + + @Test + public void testShouldNotSkip() { + assertFalse(TestUtil.shouldSkip("test", SAMPLE_DISABLED_TESTS_MAP)); + assertFalse(TestUtil.shouldSkip("foo", SAMPLE_DISABLED_TESTS_MAP)); + } +} diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java index ee57f3f..355fd15 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java @@ -66,7 +66,6 @@ import junit.framework.Assert; import org.junit.After; -import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; import org.junit.runner.RunWith; @@ -97,38 +96,43 @@ private static FileSystem fs; private String externalTableLocation = null; + protected String storageFormat; protected String tableName; protected String serdeClass; protected String inputFormatClass; protected String outputFormatClass; - /** - * List of SerDe classes that the HCatalog core tests will not be run against. - */ - public static final Set DISABLED_SERDES = ImmutableSet.of( - AvroSerDe.class.getName(), - ParquetHiveSerDe.class.getName()); - @Parameterized.Parameters public static Collection generateParameters() { - return StorageFormats.asParameters(); + List parameters = (List) StorageFormats.asParameters(); + + for (int i = 0; i < parameters.size(); i++) { + Object[] params = parameters.get(i); + System.err.println("[" + i + "] " + params[0]); + } + + return parameters; } /** * Test constructor that sets the storage format class names provided by the test parameter. */ - public HCatMapReduceTest(String name, String serdeClass, String inputFormatClass, + public HCatMapReduceTest(String storageFormat, String serdeClass, String inputFormatClass, String outputFormatClass) throws Exception { + System.err.println("\n==> " + storageFormat); + this.storageFormat = storageFormat; this.serdeClass = serdeClass; this.inputFormatClass = inputFormatClass; this.outputFormatClass = outputFormatClass; - this.tableName = TABLE_NAME + "_" + name; + this.tableName = TABLE_NAME + "_" + storageFormat; } protected abstract List getPartitionKeys(); protected abstract List getTableColumns(); + protected abstract Map> getDisabledStorageFormats(); + protected Boolean isTableExternal() { return false; } @@ -173,10 +177,6 @@ public void deleteTable() throws Exception { @Before public void createTable() throws Exception { - // Use Junit's Assume to skip running this fixture against any storage formats whose - // SerDe is in the disabled serdes list. - Assume.assumeTrue(!DISABLED_SERDES.contains(serdeClass)); - String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName; try { client.dropTable(databaseName, tableName); diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java index 0d87c6c..6809d15 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java @@ -21,18 +21,24 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.common.TestUtil; import org.apache.hive.hcatalog.data.DefaultHCatRecord; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; @@ -44,8 +50,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; public class TestHCatDynamicPartitioned extends HCatMapReduceTest { @@ -55,10 +62,15 @@ protected static final int NUM_RECORDS = 20; protected static final int NUM_PARTITIONS = 5; - public TestHCatDynamicPartitioned(String formatName, String serdeClass, String inputFormatClass, + @Override + protected Map> getDisabledStorageFormats() { + return new HashMap>(); + } + + public TestHCatDynamicPartitioned(String storageFormat, String serdeClass, String inputFormatClass, String outputFormatClass) throws Exception { - super(formatName, serdeClass, inputFormatClass, outputFormatClass); - tableName = "testHCatDynamicPartitionedTable_" + formatName; + super(storageFormat, serdeClass, inputFormatClass, outputFormatClass); + tableName = "testHCatDynamicPartitionedTable_" + storageFormat; generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); generateDataColumns(); } @@ -104,6 +116,7 @@ protected static void generateWriteRecords(int max, int mod, int offset) { */ @Test public void testHCatDynamicPartitionedTable() throws Exception { + assumeTrue(!TestUtil.shouldSkip(storageFormat, getDisabledStorageFormats())); runHCatDynamicPartitionedTable(true, null); } @@ -113,12 +126,14 @@ public void testHCatDynamicPartitionedTable() throws Exception { */ @Test public void testHCatDynamicPartitionedTableMultipleTask() throws Exception { + assumeTrue(!TestUtil.shouldSkip(storageFormat, getDisabledStorageFormats())); runHCatDynamicPartitionedTable(false, null); } protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask, String customDynamicPathPattern) throws Exception { generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); + assertEquals(NUM_RECORDS, writeRecords.size()); runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true, asSingleMapTask, customDynamicPathPattern); runMRRead(NUM_RECORDS); diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java index 58764a5..32e60d0 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java @@ -19,16 +19,30 @@ package org.apache.hive.hcatalog.mapreduce; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hive.hcatalog.common.TestUtil; import org.junit.BeforeClass; import org.junit.Test; +import static org.junit.Assume.assumeTrue; + public class TestHCatExternalDynamicPartitioned extends TestHCatDynamicPartitioned { - public TestHCatExternalDynamicPartitioned(String formatName, String serdeClass, + @Override + protected Map> getDisabledStorageFormats() { + return new HashMap>(); + } + + public TestHCatExternalDynamicPartitioned(String storageFormat, String serdeClass, String inputFormatClass, String outputFormatClass) throws Exception { - super(formatName, serdeClass, inputFormatClass, outputFormatClass); - tableName = "testHCatExternalDynamicPartitionedTable_" + formatName; + super(storageFormat, serdeClass, inputFormatClass, outputFormatClass); + tableName = "testHCatExternalDynamicPartitionedTable_" + storageFormat; generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); generateDataColumns(); } @@ -44,6 +58,7 @@ protected Boolean isTableExternal() { */ @Test public void testHCatExternalDynamicCustomLocation() throws Exception { + assumeTrue(!TestUtil.shouldSkip(storageFormat, getDisabledStorageFormats())); runHCatDynamicPartitionedTable(true, "mapred/externalDynamicOutput/${p1}"); } diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java index 6e060c0..de48141 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java @@ -20,10 +20,10 @@ package org.apache.hive.hcatalog.mapreduce; public class TestHCatExternalNonPartitioned extends TestHCatNonPartitioned { - public TestHCatExternalNonPartitioned(String formatName, String serdeName, + public TestHCatExternalNonPartitioned(String storageFormat, String serdeName, String inputFormatClass, String outputFormatClass) throws Exception { - super(formatName, serdeName, inputFormatClass, outputFormatClass); + super(storageFormat, serdeName, inputFormatClass, outputFormatClass); } @Override diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalPartitioned.java index 9f16b3b..a4591bf 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalPartitioned.java @@ -20,10 +20,10 @@ package org.apache.hive.hcatalog.mapreduce; public class TestHCatExternalPartitioned extends TestHCatPartitioned { - public TestHCatExternalPartitioned(String formatName, String serdeClass, + public TestHCatExternalPartitioned(String storageFormat, String serdeClass, String inputFormatClass, String outputFormatClass) throws Exception { - super(formatName, serdeClass, inputFormatClass, outputFormatClass); + super(storageFormat, serdeClass, inputFormatClass, outputFormatClass); } @Override diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java index 5b18739..b6d5ed3 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java @@ -20,10 +20,10 @@ package org.apache.hive.hcatalog.mapreduce; public class TestHCatMutableDynamicPartitioned extends TestHCatDynamicPartitioned { - public TestHCatMutableDynamicPartitioned(String formatName, String serdeClass, + public TestHCatMutableDynamicPartitioned(String storageFormat, String serdeClass, String inputFormatClass, String outputFormatClass) throws Exception { - super(formatName, serdeClass, inputFormatClass, outputFormatClass); + super(storageFormat, serdeClass, inputFormatClass, outputFormatClass); } @Override diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java index 354ae10..fe84431 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java @@ -20,10 +20,10 @@ package org.apache.hive.hcatalog.mapreduce; public class TestHCatMutableNonPartitioned extends TestHCatNonPartitioned { - public TestHCatMutableNonPartitioned(String formatName, String serdeClass, + public TestHCatMutableNonPartitioned(String storageFormat, String serdeClass, String inputFormatClass, String outputFormatClass) throws Exception { - super(formatName, serdeClass, inputFormatClass, outputFormatClass); + super(storageFormat, serdeClass, inputFormatClass, outputFormatClass); } @Override diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java index a22a993..2df5948 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java @@ -20,10 +20,10 @@ package org.apache.hive.hcatalog.mapreduce; public class TestHCatMutablePartitioned extends TestHCatPartitioned { - public TestHCatMutablePartitioned(String formatName, String serdeClass, + public TestHCatMutablePartitioned(String storageFormat, String serdeClass, String inputFormatClass, String outputFormatClass) throws Exception { - super(formatName, serdeClass, inputFormatClass, outputFormatClass); + super(storageFormat, serdeClass, inputFormatClass, outputFormatClass); } @Override diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java index 174a92f..5140d52 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java @@ -22,14 +22,18 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.common.TestUtil; import org.apache.hive.hcatalog.data.DefaultHCatRecord; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; @@ -38,6 +42,7 @@ import org.junit.Test; import static junit.framework.Assert.assertEquals; +import static org.junit.Assume.assumeTrue; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -46,11 +51,16 @@ private static List writeRecords; static List partitionColumns; - public TestHCatNonPartitioned(String formatName, String serdeClass, String inputFormatClass, + @Override + protected Map> getDisabledStorageFormats() { + return new HashMap>(); + } + + public TestHCatNonPartitioned(String storageFormat, String serdeClass, String inputFormatClass, String outputFormatClass) throws Exception { - super(formatName, serdeClass, inputFormatClass, outputFormatClass); + super(storageFormat, serdeClass, inputFormatClass, outputFormatClass); dbName = null; //test if null dbName works ("default" is used) - tableName = "testHCatNonPartitionedTable_" + formatName; + tableName = "testHCatNonPartitionedTable_" + storageFormat; writeRecords = new ArrayList(); for (int i = 0; i < 20; i++) { @@ -84,6 +94,7 @@ public TestHCatNonPartitioned(String formatName, String serdeClass, String input @Test public void testHCatNonPartitionedTable() throws Exception { + assumeTrue(!TestUtil.shouldSkip(storageFormat, getDisabledStorageFormats())); Map partitionMap = new HashMap(); runMRCreate(null, partitionColumns, writeRecords, 10, true); diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java index a386415..cdaefda 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java @@ -22,14 +22,18 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.common.TestUtil; import org.apache.hive.hcatalog.data.DefaultHCatRecord; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; @@ -38,21 +42,31 @@ import org.junit.BeforeClass; import org.junit.Test; -import static junit.framework.Assert.assertEquals; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; +import static org.junit.Assume.assumeTrue; public class TestHCatPartitioned extends HCatMapReduceTest { private static List writeRecords; private static List partitionColumns; - public TestHCatPartitioned(String formatName, String serdeClass, String inputFormatClass, + @Override + protected Map> getDisabledStorageFormats() { + return new HashMap>() {{ + put(IOConstants.PARQUETFILE, new HashSet() {{ + add("testHCatPartitionedTable"); + }}); + }}; + } + + public TestHCatPartitioned(String storageFormat, String serdeClass, String inputFormatClass, String outputFormatClass) throws Exception { - super(formatName, serdeClass, inputFormatClass, outputFormatClass); - tableName = "testHCatPartitionedTable_" + formatName; + super(storageFormat, serdeClass, inputFormatClass, outputFormatClass); + tableName = "testHCatPartitionedTable_" + storageFormat; writeRecords = new ArrayList(); for (int i = 0; i < 20; i++) { @@ -88,7 +102,7 @@ public TestHCatPartitioned(String formatName, String serdeClass, String inputFor @Test public void testHCatPartitionedTable() throws Exception { - + assumeTrue(!TestUtil.shouldSkip(storageFormat, getDisabledStorageFormats())); Map partitionMap = new HashMap(); partitionMap.put("part1", "p1value1"); partitionMap.put("part0", "p0value1"); @@ -101,7 +115,7 @@ public void testHCatPartitionedTable() throws Exception { runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true); - //Test for duplicate publish -- this will either fail on job creation time + // Test for duplicate publish -- this will either fail on job creation time // and throw an exception, or will fail at runtime, and fail the job. IOException exc = null; @@ -117,7 +131,7 @@ public void testHCatPartitionedTable() throws Exception { assertNull(exc); } - //Test for publish with invalid partition key name + // Test for publish with invalid partition key name exc = null; partitionMap.clear(); partitionMap.put("px1", "p1value2"); @@ -133,7 +147,7 @@ public void testHCatPartitionedTable() throws Exception { assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException) exc).getErrorType()); } - //Test for publish with missing partition key values + // Test for publish with missing partition key values exc = null; partitionMap.clear(); partitionMap.put("px", "p1value2"); @@ -148,8 +162,7 @@ public void testHCatPartitionedTable() throws Exception { assertTrue(exc instanceof HCatException); assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType()); - - //Test for null partition value map + // Test for null partition value map exc = null; try { runMRCreate(null, partitionColumns, writeRecords, 20, false); @@ -158,21 +171,22 @@ public void testHCatPartitionedTable() throws Exception { } assertTrue(exc == null); -// assertTrue(exc instanceof HCatException); -// assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException) exc).getErrorType()); + //assertTrue(exc instanceof HCatException); + //assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException) exc).getErrorType()); + // With Dynamic partitioning, this isn't an error that the keyValues specified didn't values - //Read should get 10 + 20 rows if immutable, 50 (10+20+20) if mutable - if (isTableImmutable()){ + // Read should get 10 + 20 rows if immutable, 50 (10+20+20) if mutable + if (isTableImmutable()) { runMRRead(30); } else { runMRRead(50); } - //Read with partition filter + // Read with partition filter runMRRead(10, "part1 = \"p1value1\""); runMRRead(10, "part0 = \"p0value1\""); - if (isTableImmutable()){ + if (isTableImmutable()) { runMRRead(20, "part1 = \"p1value2\""); runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\""); runMRRead(20, "part0 = \"p0value2\""); @@ -189,26 +203,24 @@ public void testHCatPartitionedTable() throws Exception { hiveReadTest(); } - - //test that new columns gets added to table schema + /* + * Test that new columns gets added to table schema. + */ private void tableSchemaTest() throws Exception { - HCatSchema tableSchema = getTableSchema(); - assertEquals(4, tableSchema.getFields().size()); - //Update partition schema to have 3 fields - partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", serdeConstants.STRING_TYPE_NAME, ""))); + // Update partition schema to have 3 fields + partitionColumns.add( + HCatSchemaUtils.getHCatFieldSchema( + new FieldSchema("c3", serdeConstants.STRING_TYPE_NAME, ""))); writeRecords = new ArrayList(); - for (int i = 0; i < 20; i++) { List objList = new ArrayList(); - objList.add(i); objList.add("strvalue" + i); objList.add("str2value" + i); - writeRecords.add(new DefaultHCatRecord(objList)); } @@ -220,7 +232,7 @@ private void tableSchemaTest() throws Exception { tableSchema = getTableSchema(); - //assert that c3 has got added to table schema + // Assert that c3 has got added to table schema assertEquals(5, tableSchema.getFields().size()); assertEquals("c1", tableSchema.getFields().get(0).getName()); assertEquals("c2", tableSchema.getFields().get(1).getName()); @@ -228,7 +240,7 @@ private void tableSchemaTest() throws Exception { assertEquals("part1", tableSchema.getFields().get(3).getName()); assertEquals("part0", tableSchema.getFields().get(4).getName()); - //Test that changing column data type fails + // Test that changing column data type fails partitionMap.clear(); partitionMap.put("part1", "p1value6"); partitionMap.put("part0", "p0value6"); @@ -248,7 +260,7 @@ private void tableSchemaTest() throws Exception { assertTrue(exc instanceof HCatException); assertEquals(ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, ((HCatException) exc).getErrorType()); - //Test that partition key is not allowed in data + // Test that partition key is not allowed in data partitionColumns = new ArrayList(); partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, ""))); partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, ""))); @@ -258,12 +270,10 @@ private void tableSchemaTest() throws Exception { List recordsContainingPartitionCols = new ArrayList(20); for (int i = 0; i < 20; i++) { List objList = new ArrayList(); - objList.add(i); objList.add("c2value" + i); objList.add("c3value" + i); objList.add("p1value6"); - recordsContainingPartitionCols.add(new DefaultHCatRecord(objList)); } @@ -290,11 +300,11 @@ private void tableSchemaTest() throws Exception { } } - //check behavior while change the order of columns + /* + * Check behavior while change the order of columns + */ private void columnOrderChangeTest() throws Exception { - HCatSchema tableSchema = getTableSchema(); - assertEquals(5, tableSchema.getFields().size()); partitionColumns = new ArrayList(); @@ -302,16 +312,12 @@ private void columnOrderChangeTest() throws Exception { partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", serdeConstants.STRING_TYPE_NAME, ""))); partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, ""))); - writeRecords = new ArrayList(); - for (int i = 0; i < 10; i++) { List objList = new ArrayList(); - objList.add(i); objList.add("co strvalue" + i); objList.add("co str2value" + i); - writeRecords.add(new DefaultHCatRecord(objList)); } @@ -330,38 +336,35 @@ private void columnOrderChangeTest() throws Exception { assertTrue(exc instanceof HCatException); assertEquals(ErrorType.ERROR_SCHEMA_COLUMN_MISMATCH, ((HCatException) exc).getErrorType()); - partitionColumns = new ArrayList(); partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", serdeConstants.INT_TYPE_NAME, ""))); partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", serdeConstants.STRING_TYPE_NAME, ""))); writeRecords = new ArrayList(); - for (int i = 0; i < 10; i++) { List objList = new ArrayList(); - objList.add(i); objList.add("co strvalue" + i); - writeRecords.add(new DefaultHCatRecord(objList)); } runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true); if (isTableImmutable()){ - //Read should get 10 + 20 + 10 + 10 + 20 rows + // Read should get 10 + 20 + 10 + 10 + 20 rows runMRRead(70); } else { - runMRRead(90); // +20 from the duplicate publish + // +20 from the duplicate publish + runMRRead(90); } } - //Test that data inserted through hcatoutputformat is readable from hive + /* + * Test that data inserted through hcatoutputformat is readable from hive. + */ private void hiveReadTest() throws Exception { - String query = "select * from " + tableName; int retCode = driver.run(query).getResponseCode(); - if (retCode != 0) { throw new Exception("Error " + retCode + " running query " + query); } @@ -369,11 +372,11 @@ private void hiveReadTest() throws Exception { ArrayList res = new ArrayList(); driver.getResults(res); if (isTableImmutable()){ - //Read should get 10 + 20 + 10 + 10 + 20 rows + // Read should get 10 + 20 + 10 + 10 + 20 rows assertEquals(70, res.size()); } else { - assertEquals(90, res.size()); // +20 from the duplicate publish + // +20 from the duplicate publish + assertEquals(90, res.size()); } - } } diff --git a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java index 36221b7..55a0276 100644 --- a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java +++ b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java @@ -206,7 +206,7 @@ public Table getTable(String location, String hcatServerUri, String hcatServerPr HiveMetaStoreClient client = null; try { client = getHiveMetaClient(hcatServerUri, hcatServerPrincipal, PigHCatUtil.class, job); - table = HCatUtil.getTable(client, dbName, tableName); + table = new Table(client.getTable(dbName, tableName)); } catch (NoSuchObjectException nsoe) { throw new PigException("Table not found : " + nsoe.getMessage(), PIG_EXCEPTION_CODE); // prettier error messages to frontend } catch (Exception e) { diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java index 5eabba1..d877ecd 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java @@ -54,6 +54,7 @@ import org.apache.hive.hcatalog.HcatTestUtils; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.common.TestUtil; import org.apache.hive.hcatalog.data.Pair; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderComplexSchema.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderComplexSchema.java index 447f39f..2dae9ab 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderComplexSchema.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderComplexSchema.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.io.StorageFormats; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.hcatalog.common.TestUtil; import org.apache.pig.ExecType; import org.apache.pig.PigServer; diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java index a380f61..de2c3c4 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java @@ -43,6 +43,7 @@ import org.apache.hive.hcatalog.HcatTestUtils; import org.apache.hive.hcatalog.mapreduce.HCatBaseTest; +import org.apache.hive.hcatalog.common.TestUtil; import org.apache.pig.EvalFunc; import org.apache.pig.ExecType; diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerMulti.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerMulti.java index 0c3ec8b..b9f4b63 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerMulti.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerMulti.java @@ -41,6 +41,7 @@ import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.Pair; +import org.apache.hive.hcatalog.common.TestUtil; import org.apache.pig.ExecType; import org.apache.pig.PigServer; diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestUtil.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestUtil.java deleted file mode 100644 index 8a652f0..0000000 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestUtil.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hive.hcatalog.pig; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import org.junit.Test; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; - -/** - * Test utilities for selectively disabling specific test methods for given storage formats. - */ -public class TestUtil { - private static final Map> SAMPLE_DISABLED_TESTS_MAP = - new HashMap>() {{ - put("test", new HashSet() {{ - add("testShouldSkip"); - }}); - }}; - - /** - * Determine whether the caller test method is in a set of disabled test methods for a given - * storage format. - * - * @param storageFormat The name of the storage format used in a STORED AS clause. - * @param disabledTestsMap Map of storage format name to set of test method names that indicate - * which test methods should not run against the given storage format. - * @return True if the caller test method should be skipped for the given storage format. - */ - public static boolean shouldSkip(String storageFormat, Map> disabledTestsMap) { - final StackTraceElement[] elements = Thread.currentThread().getStackTrace(); - // The "bottom" of the call stack is at the front of the array. The elements are as follows: - // [0] getStackTrace() - // [1] shouldSkip() - // [2] caller test method - String methodName = elements[2].getMethodName(); - if (!disabledTestsMap.containsKey(storageFormat)) { - return false; - } - - Set disabledMethods = disabledTestsMap.get(storageFormat); - return disabledMethods.contains(methodName); - } - - @Test - public void testShouldSkip() { - assertTrue(TestUtil.shouldSkip("test", SAMPLE_DISABLED_TESTS_MAP)); - } - - @Test - public void testShouldNotSkip() { - assertFalse(TestUtil.shouldSkip("test", SAMPLE_DISABLED_TESTS_MAP)); - assertFalse(TestUtil.shouldSkip("foo", SAMPLE_DISABLED_TESTS_MAP)); - } -} diff --git a/hcatalog/webhcat/svr/.gitignore b/hcatalog/webhcat/svr/.gitignore new file mode 100644 index 0000000..916e17c --- /dev/null +++ b/hcatalog/webhcat/svr/.gitignore @@ -0,0 +1 @@ +dependency-reduced-pom.xml diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java index 9695346..64781ae 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java @@ -105,7 +105,7 @@ public void initialize(Configuration configuration, Properties properties) throw columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); List columnComments; - if (columnCommentProperty.isEmpty()) { + if (columnCommentProperty == null || columnCommentProperty.isEmpty()) { columnComments = new ArrayList(); } else { columnComments = Arrays.asList(columnCommentProperty.split(","));