From 49b1f0459f11133f4ad1d6ca80114ba58417f40c Mon Sep 17 00:00:00 2001 From: "David Z. Chen" Date: Fri, 13 Jun 2014 14:27:31 -0700 Subject: [PATCH] HIVE-7094: Reformat with Hive formatting styles. --- .../DynamicPartitionFileRecordWriterContainer.java | 118 ++++++++------------- .../mapreduce/FileRecordWriterContainer.java | 44 ++++---- .../StaticPartitionFileRecordWriterContainer.java | 17 ++- 3 files changed, 71 insertions(+), 108 deletions(-) diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java index ff6dba8..4df912a 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java @@ -48,16 +48,14 @@ import org.apache.hive.hcatalog.data.HCatRecord; /** - * Record writer container for tables using dynamic partitioning. - * See {@link FileOutputFormatContainer} for more information + * Record writer container for tables using dynamic partitioning. See + * {@link FileOutputFormatContainer} for more information */ class DynamicPartitionFileRecordWriterContainer extends FileRecordWriterContainer { private final List dynamicPartCols; private int maxDynamicPartitions; - private final Map, - ? super Writable>> baseDynamicWriters; + private final Map, ? super Writable>> baseDynamicWriters; private final Map baseDynamicSerDe; private final Map baseDynamicCommitters; private final Map dynamicContexts; @@ -71,44 +69,37 @@ * @throws InterruptedException */ public DynamicPartitionFileRecordWriterContainer( - RecordWriter< - ? super WritableComparable, - ? super Writable> baseWriter, + RecordWriter, ? super Writable> baseWriter, TaskAttemptContext context) throws IOException, InterruptedException { super(baseWriter, context); maxDynamicPartitions = jobInfo.getMaxDynamicPartitions(); dynamicPartCols = jobInfo.getPosOfDynPartCols(); if (dynamicPartCols == null) { - throw new HCatException("It seems that setSchema() is not called on " + - "HCatOutputFormat. Please make sure that method is called."); + throw new HCatException("It seems that setSchema() is not called on " + + "HCatOutputFormat. Please make sure that method is called."); } this.baseDynamicSerDe = new HashMap(); - this.baseDynamicWriters = new HashMap, - ? super Writable>>(); - this.baseDynamicCommitters = new HashMap< - String, org.apache.hadoop.mapred.OutputCommitter>(); - this.dynamicContexts = new HashMap< - String, org.apache.hadoop.mapred.TaskAttemptContext>(); + this.baseDynamicWriters = + new HashMap, ? super Writable>>(); + this.baseDynamicCommitters = new HashMap(); + this.dynamicContexts = new HashMap(); this.dynamicObjectInspectors = new HashMap(); this.dynamicOutputJobInfo = new HashMap(); } @Override - public void close(TaskAttemptContext context) - throws IOException, InterruptedException { + public void close(TaskAttemptContext context) throws IOException, InterruptedException { Reporter reporter = InternalUtil.createReporter(context); - for (RecordWriter, ? super Writable> bwriter - : baseDynamicWriters.values()) { + for (RecordWriter, ? super Writable> bwriter : baseDynamicWriters + .values()) { // We are in RecordWriter.close() make sense that the context would be // TaskInputOutput. bwriter.close(reporter); } - for (Map.Entry entry - : baseDynamicCommitters.entrySet()) { - org.apache.hadoop.mapred.TaskAttemptContext currContext = - dynamicContexts.get(entry.getKey()); + for (Map.Entry entry : baseDynamicCommitters + .entrySet()) { + org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey()); OutputCommitter baseOutputCommitter = entry.getValue(); if (baseOutputCommitter.needsTaskCommit(currContext)) { baseOutputCommitter.commitTask(currContext); @@ -117,8 +108,7 @@ public void close(TaskAttemptContext context) } @Override - protected LocalFileWriter getLocalFileWriter(HCatRecord value) - throws IOException, HCatException { + protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOException, HCatException { OutputJobInfo localJobInfo = null; // Calculate which writer to use from the remaining values - this needs to // be done before we delete cols. @@ -129,37 +119,32 @@ protected LocalFileWriter getLocalFileWriter(HCatRecord value) String dynKey = dynamicPartValues.toString(); if (!baseDynamicWriters.containsKey(dynKey)) { - if ((maxDynamicPartitions != -1) && - (baseDynamicWriters.size() > maxDynamicPartitions)) { + if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)) { throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, - "Number of dynamic partitions being created " - + "exceeds configured max allowable partitions[" - + maxDynamicPartitions - + "], increase parameter [" - + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname - + "] if needed."); + "Number of dynamic partitions being created " + + "exceeds configured max allowable partitions[" + maxDynamicPartitions + + "], increase parameter [" + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname + + "] if needed."); } org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = HCatMapRedUtil.createTaskAttemptContext(context); configureDynamicStorageHandler(currTaskContext, dynamicPartValues); - localJobInfo = HCatBaseOutputFormat.getJobInfo( - currTaskContext.getConfiguration()); + localJobInfo = HCatBaseOutputFormat.getJobInfo(currTaskContext.getConfiguration()); // Setup serDe. - SerDe currSerDe = ReflectionUtils.newInstance( - storageHandler.getSerDeClass(), currTaskContext.getJobConf()); + SerDe currSerDe = + ReflectionUtils.newInstance(storageHandler.getSerDeClass(), currTaskContext.getJobConf()); try { - InternalUtil.initializeOutputSerDe( - currSerDe, currTaskContext.getConfiguration(), localJobInfo); + InternalUtil.initializeOutputSerDe(currSerDe, currTaskContext.getConfiguration(), + localJobInfo); } catch (SerDeException e) { throw new IOException("Failed to initialize SerDe", e); } // create base OutputFormat org.apache.hadoop.mapred.OutputFormat baseOF = - ReflectionUtils.newInstance( - storageHandler.getOutputFormatClass(), + ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf()); // We are skipping calling checkOutputSpecs() for each partition @@ -184,54 +169,43 @@ protected LocalFileWriter getLocalFileWriter(HCatRecord value) // Recreate to refresh jobConf of currTask context. currTaskContext = - HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(), - currTaskContext.getTaskAttemptID(), - currTaskContext.getProgressible()); + HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(), + currTaskContext.getTaskAttemptID(), currTaskContext.getProgressible()); // Set temp location. currTaskContext.getConfiguration().set( "mapred.work.output.dir", - new FileOutputCommitter( - new Path(localJobInfo.getLocation()), currTaskContext) - .getWorkPath().toString()); + new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext) + .getWorkPath().toString()); // Set up task. baseOutputCommitter.setupTask(currTaskContext); - Path parentDir = new Path( - currTaskContext.getConfiguration().get("mapred.work.output.dir")); - Path childPath = new Path( - parentDir, - FileOutputFormat.getUniqueFile(currTaskContext, "part", "")); + Path parentDir = new Path(currTaskContext.getConfiguration().get("mapred.work.output.dir")); + Path childPath = + new Path(parentDir, FileOutputFormat.getUniqueFile(currTaskContext, "part", "")); - RecordWriter baseRecordWriter = baseOF.getRecordWriter( - parentDir.getFileSystem(currTaskContext.getConfiguration()), - currTaskContext.getJobConf(), - childPath.toString(), - InternalUtil.createReporter(currTaskContext)); + RecordWriter baseRecordWriter = + baseOF.getRecordWriter(parentDir.getFileSystem(currTaskContext.getConfiguration()), + currTaskContext.getJobConf(), childPath.toString(), + InternalUtil.createReporter(currTaskContext)); baseDynamicWriters.put(dynKey, baseRecordWriter); baseDynamicSerDe.put(dynKey, currSerDe); baseDynamicCommitters.put(dynKey, baseOutputCommitter); dynamicContexts.put(dynKey, currTaskContext); - dynamicObjectInspectors.put( - dynKey, + dynamicObjectInspectors.put(dynKey, InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema())); - dynamicOutputJobInfo.put( - dynKey, - HCatOutputFormat.getJobInfo( - dynamicContexts.get(dynKey).getConfiguration())); + dynamicOutputJobInfo.put(dynKey, + HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey).getConfiguration())); } - return new LocalFileWriter( - baseDynamicWriters.get(dynKey), - dynamicObjectInspectors.get(dynKey), - baseDynamicSerDe.get(dynKey), - dynamicOutputJobInfo.get(dynKey)); + return new LocalFileWriter(baseDynamicWriters.get(dynKey), dynamicObjectInspectors.get(dynKey), + baseDynamicSerDe.get(dynKey), dynamicOutputJobInfo.get(dynKey)); } - protected void configureDynamicStorageHandler( - JobContext context, List dynamicPartVals) throws IOException { + protected void configureDynamicStorageHandler(JobContext context, List dynamicPartVals) + throws IOException { HCatOutputFormat.configureOutputStorageHandler(context, dynamicPartVals); } } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java index d2c386d..2a883d6 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java @@ -48,8 +48,8 @@ import org.apache.hive.hcatalog.data.HCatRecord; /** - * Part of the FileOutput*Container classes - * See {@link FileOutputFormatContainer} for more information + * Part of the FileOutput*Container classes See {@link FileOutputFormatContainer} for more + * information */ abstract class FileRecordWriterContainer extends RecordWriterContainer { @@ -70,21 +70,18 @@ */ public FileRecordWriterContainer( RecordWriter, ? super Writable> baseWriter, - TaskAttemptContext context) - throws IOException, InterruptedException { + TaskAttemptContext context) throws IOException, InterruptedException { super(context, baseWriter); this.context = context; jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration()); - storageHandler = HCatUtil.getStorageHandler( - context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo()); - serDe = ReflectionUtils.newInstance( - storageHandler.getSerDeClass(), context.getConfiguration()); - objectInspector = InternalUtil.createStructObjectInspector( - jobInfo.getOutputSchema()); + storageHandler = + HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo() + .getStorerInfo()); + serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), context.getConfiguration()); + objectInspector = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()); try { - InternalUtil.initializeOutputSerDe( - serDe, context.getConfiguration(), jobInfo); + InternalUtil.initializeOutputSerDe(serDe, context.getConfiguration(), jobInfo); } catch (SerDeException e) { throw new IOException("Failed to inialize SerDe", e); } @@ -92,8 +89,8 @@ public FileRecordWriterContainer( // If partition columns occur in data, we want to remove them. partColsToDel = jobInfo.getPosOfPartCols(); if (partColsToDel == null) { - throw new HCatException("It seems that setSchema() is not called on " + - "HCatOutputFormat. Please make sure that method is called."); + throw new HCatException("It seems that setSchema() is not called on " + + "HCatOutputFormat. Please make sure that method is called."); } } @@ -104,16 +101,15 @@ public HiveStorageHandler getStorageHandler() { return storageHandler; } - abstract protected LocalFileWriter getLocalFileWriter(HCatRecord value) - throws IOException, HCatException; + abstract protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOException, + HCatException; @Override - public void write(WritableComparable key, HCatRecord value) - throws IOException, InterruptedException { + public void write(WritableComparable key, HCatRecord value) throws IOException, + InterruptedException { LocalFileWriter localFileWriter = getLocalFileWriter(value); RecordWriter localWriter = localFileWriter.getLocalWriter(); - ObjectInspector localObjectInspector = - localFileWriter.getLocalObjectInspector(); + ObjectInspector localObjectInspector = localFileWriter.getLocalObjectInspector(); SerDe localSerDe = localFileWriter.getLocalSerDe(); OutputJobInfo localJobInfo = localFileWriter.getLocalJobInfo(); @@ -123,8 +119,7 @@ public void write(WritableComparable key, HCatRecord value) // The key given by user is ignored try { - localWriter.write( - NullWritable.get(), + localWriter.write(NullWritable.get(), localSerDe.serialize(value.getAll(), localObjectInspector)); } catch (SerDeException e) { throw new IOException("Failed to serialize object", e); @@ -137,9 +132,8 @@ public void write(WritableComparable key, HCatRecord value) private SerDe localSerDe; private OutputJobInfo localJobInfo; - public LocalFileWriter(RecordWriter localWriter, - ObjectInspector localObjectInspector, SerDe localSerDe, - OutputJobInfo localJobInfo) { + public LocalFileWriter(RecordWriter localWriter, ObjectInspector localObjectInspector, + SerDe localSerDe, OutputJobInfo localJobInfo) { this.localWriter = localWriter; this.localObjectInspector = localObjectInspector; this.localSerDe = localSerDe; diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java index 0b4c42e..b3ea76e 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java @@ -31,8 +31,8 @@ import org.apache.hive.hcatalog.data.HCatRecord; /** - * Record writer container for tables using static partitioning. - * See {@link FileOutputFormatContainer} for more information + * Record writer container for tables using static partitioning. See + * {@link FileOutputFormatContainer} for more information */ class StaticPartitionFileRecordWriterContainer extends FileRecordWriterContainer { /** @@ -42,24 +42,19 @@ * @throws InterruptedException */ public StaticPartitionFileRecordWriterContainer( - RecordWriter< - ? super WritableComparable, - ? super Writable> baseWriter, + RecordWriter, ? super Writable> baseWriter, TaskAttemptContext context) throws IOException, InterruptedException { super(baseWriter, context); } @Override - public void close(TaskAttemptContext context) - throws IOException, InterruptedException { + public void close(TaskAttemptContext context) throws IOException, InterruptedException { Reporter reporter = InternalUtil.createReporter(context); getBaseRecordWriter().close(reporter); } @Override - protected LocalFileWriter getLocalFileWriter(HCatRecord value) - throws IOException, HCatException { - return new LocalFileWriter( - getBaseRecordWriter(), objectInspector, serDe, jobInfo); + protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOException, HCatException { + return new LocalFileWriter(getBaseRecordWriter(), objectInspector, serDe, jobInfo); } } -- 1.8.3.4 (Apple Git-47)