From 5d6614b3717fd6374813c328073c15b7fd4cf0d8 Mon Sep 17 00:00:00 2001 From: "David Z. Chen" Date: Tue, 20 May 2014 02:17:10 -0700 Subject: [PATCH] HIVE-7094: Separate static and dynamic partitioning implementations from FileRecordWriterContainer. --- .../DynamicFileRecordWriterContainer.java | 239 +++++++++++++++++++++ .../mapreduce/FileOutputFormatContainer.java | 17 +- .../mapreduce/FileRecordWriterContainer.java | 230 ++++++-------------- .../mapreduce/StaticFileRecordWriterContainer.java | 65 ++++++ 4 files changed, 377 insertions(+), 174 deletions(-) create mode 100644 hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicFileRecordWriterContainer.java create mode 100644 hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticFileRecordWriterContainer.java diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicFileRecordWriterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicFileRecordWriterContainer.java new file mode 100644 index 0000000..b05dbb9 --- /dev/null +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicFileRecordWriterContainer.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hive.hcatalog.common.ErrorType; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.data.HCatRecord; + +/** + * Part of the FileOutput*Container classes + * See {@link FileOutputFormatContainer} for more information + */ +class DynamicFileRecordWriterContainer extends FileRecordWriterContainer { + private final List dynamicPartCols; + private int maxDynamicPartitions; + + private final Map, + ? super Writable>> baseDynamicWriters; + private final Map baseDynamicSerDe; + private final Map< + String, org.apache.hadoop.mapred.OutputCommitter> baseDynamicCommitters; + private final Map< + String, org.apache.hadoop.mapred.TaskAttemptContext> dynamicContexts; + private final Map dynamicObjectInspectors; + private Map dynamicOutputJobInfo; + + /** + * @param baseWriter RecordWriter to contain + * @param context current TaskAttemptContext + * @throws IOException + * @throws InterruptedException + */ + public DynamicFileRecordWriterContainer( + RecordWriter< + ? super WritableComparable, + ? super Writable> baseWriter, + TaskAttemptContext context) throws IOException, InterruptedException { + super(baseWriter, context); + maxDynamicPartitions = jobInfo.getMaxDynamicPartitions(); + dynamicPartCols = jobInfo.getPosOfDynPartCols(); + if (dynamicPartCols == null) { + throw new HCatException("It seems that setSchema() is not called on " + + "HCatOutputFormat. Please make sure that method is called."); + } + + this.baseDynamicSerDe = new HashMap(); + this.baseDynamicWriters = new HashMap, + ? super Writable>>(); + this.baseDynamicCommitters = new HashMap< + String, org.apache.hadoop.mapred.OutputCommitter>(); + this.dynamicContexts = new HashMap< + String, org.apache.hadoop.mapred.TaskAttemptContext>(); + this.dynamicObjectInspectors = new HashMap(); + this.dynamicOutputJobInfo = new HashMap(); + } + + @Override + public void close(TaskAttemptContext context) + throws IOException, InterruptedException { + Reporter reporter = InternalUtil.createReporter(context); + for (RecordWriter, ? super Writable> bwriter + : baseDynamicWriters.values()) { + // We are in RecordWriter.close() make sense that the context would be + // TaskInputOutput. + bwriter.close(reporter); + } + for (Map.Entry entry + : baseDynamicCommitters.entrySet()) { + org.apache.hadoop.mapred.TaskAttemptContext currContext = + dynamicContexts.get(entry.getKey()); + OutputCommitter baseOutputCommitter = entry.getValue(); + if (baseOutputCommitter.needsTaskCommit(currContext)) { + baseOutputCommitter.commitTask(currContext); + } + } + } + + @Override + protected LocalFileWriter getLocalFileWriter(HCatRecord value) + throws IOException, HCatException { + OutputJobInfo localJobInfo = null; + // Calculate which writer to use from the remaining values - this needs to + // be done before we delete cols. + List dynamicPartValues = new ArrayList(); + for (Integer colToAppend : dynamicPartCols) { + dynamicPartValues.add(value.get(colToAppend).toString()); + } + + String dynKey = dynamicPartValues.toString(); + if (!baseDynamicWriters.containsKey(dynKey)) { + if ((maxDynamicPartitions != -1) && + (baseDynamicWriters.size() > maxDynamicPartitions)) { + throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, + "Number of dynamic partitions being created " + + "exceeds configured max allowable partitions[" + + maxDynamicPartitions + + "], increase parameter [" + + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname + + "] if needed."); + } + + org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = + HCatMapRedUtil.createTaskAttemptContext(context); + configureDynamicStorageHandler(currTaskContext, dynamicPartValues); + localJobInfo = HCatBaseOutputFormat.getJobInfo( + currTaskContext.getConfiguration()); + + // Setup serDe. + SerDe currSerDe = ReflectionUtils.newInstance( + storageHandler.getSerDeClass(), currTaskContext.getJobConf()); + try { + InternalUtil.initializeOutputSerDe( + currSerDe, currTaskContext.getConfiguration(), localJobInfo); + } catch (SerDeException e) { + throw new IOException("Failed to initialize SerDe", e); + } + + // create base OutputFormat + org.apache.hadoop.mapred.OutputFormat baseOF = + ReflectionUtils.newInstance( + storageHandler.getOutputFormatClass(), + currTaskContext.getJobConf()); + + // We are skipping calling checkOutputSpecs() for each partition + // As it can throw a FileAlreadyExistsException when more than one + // mapper is writing to a partition. + // See HCATALOG-490, also to avoid contacting the namenode for each new + // FileOutputFormat instance. + // In general this should be ok for most FileOutputFormat implementations + // but may become an issue for cases when the method is used to perform + // other setup tasks. + + // Get Output Committer + org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = + currTaskContext.getJobConf().getOutputCommitter(); + + // Create currJobContext the latest so it gets all the config changes + org.apache.hadoop.mapred.JobContext currJobContext = + HCatMapRedUtil.createJobContext(currTaskContext); + + // Set up job. + baseOutputCommitter.setupJob(currJobContext); + + // Recreate to refresh jobConf of currTask context. + currTaskContext = + HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(), + currTaskContext.getTaskAttemptID(), + currTaskContext.getProgressible()); + + // Set temp location. + currTaskContext.getConfiguration().set( + "mapred.work.output.dir", + new FileOutputCommitter( + new Path(localJobInfo.getLocation()), currTaskContext) + .getWorkPath().toString()); + + // Set up task. + baseOutputCommitter.setupTask(currTaskContext); + + Path parentDir = new Path( + currTaskContext.getConfiguration().get("mapred.work.output.dir")); + Path childPath = new Path( + parentDir, + FileOutputFormat.getUniqueFile(currTaskContext, "part", "")); + + RecordWriter baseRecordWriter = baseOF.getRecordWriter( + parentDir.getFileSystem(currTaskContext.getConfiguration()), + currTaskContext.getJobConf(), + childPath.toString(), + InternalUtil.createReporter(currTaskContext)); + + baseDynamicWriters.put(dynKey, baseRecordWriter); + baseDynamicSerDe.put(dynKey, currSerDe); + baseDynamicCommitters.put(dynKey, baseOutputCommitter); + dynamicContexts.put(dynKey, currTaskContext); + dynamicObjectInspectors.put( + dynKey, + InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema())); + dynamicOutputJobInfo.put( + dynKey, + HCatOutputFormat.getJobInfo( + dynamicContexts.get(dynKey).getConfiguration())); + } + + return new LocalFileWriter( + baseDynamicWriters.get(dynKey), + dynamicObjectInspectors.get(dynKey), + baseDynamicSerDe.get(dynKey), + dynamicOutputJobInfo.get(dynKey)); + } + + protected void configureDynamicStorageHandler( + JobContext context, List dynamicPartVals) throws IOException { + HCatOutputFormat.configureOutputStorageHandler(context, dynamicPartVals); + } +} diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java index e9ca263..613cca3 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java @@ -95,18 +95,19 @@ public FileOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat, ? super Writable>> baseDynamicWriters; - private final Map baseDynamicSerDe; - private final Map baseDynamicCommitters; - private final Map dynamicContexts; - private final Map dynamicObjectInspectors; - private Map dynamicOutputJobInfo; +abstract class FileRecordWriterContainer extends RecordWriterContainer { + protected final HiveStorageHandler storageHandler; + protected final SerDe serDe; + protected final ObjectInspector objectInspector; private final List partColsToDel; - private final List dynamicPartCols; - private int maxDynamicPartitions; - private OutputJobInfo jobInfo; - private TaskAttemptContext context; + protected OutputJobInfo jobInfo; + protected TaskAttemptContext context; /** * @param baseWriter RecordWriter to contain @@ -79,48 +68,33 @@ * @throws IOException * @throws InterruptedException */ - public FileRecordWriterContainer(org.apache.hadoop.mapred.RecordWriter, ? super Writable> baseWriter, - TaskAttemptContext context) throws IOException, InterruptedException { + public FileRecordWriterContainer( + RecordWriter, ? super Writable> baseWriter, + TaskAttemptContext context) + throws IOException, InterruptedException { super(context, baseWriter); this.context = context; jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration()); - storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo()); - serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), context.getConfiguration()); - objectInspector = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()); + storageHandler = HCatUtil.getStorageHandler( + context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo()); + serDe = ReflectionUtils.newInstance( + storageHandler.getSerDeClass(), context.getConfiguration()); + objectInspector = InternalUtil.createStructObjectInspector( + jobInfo.getOutputSchema()); try { - InternalUtil.initializeOutputSerDe(serDe, context.getConfiguration(), jobInfo); + InternalUtil.initializeOutputSerDe( + serDe, context.getConfiguration(), jobInfo); } catch (SerDeException e) { throw new IOException("Failed to inialize SerDe", e); } // If partition columns occur in data, we want to remove them. partColsToDel = jobInfo.getPosOfPartCols(); - dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed(); - dynamicPartCols = jobInfo.getPosOfDynPartCols(); - maxDynamicPartitions = jobInfo.getMaxDynamicPartitions(); - - if ((partColsToDel == null) || (dynamicPartitioningUsed && (dynamicPartCols == null))) { + if (partColsToDel == null) { throw new HCatException("It seems that setSchema() is not called on " + "HCatOutputFormat. Please make sure that method is called."); } - - - if (!dynamicPartitioningUsed) { - this.baseDynamicSerDe = null; - this.baseDynamicWriters = null; - this.baseDynamicCommitters = null; - this.dynamicContexts = null; - this.dynamicObjectInspectors = null; - this.dynamicOutputJobInfo = null; - } else { - this.baseDynamicSerDe = new HashMap(); - this.baseDynamicWriters = new HashMap, ? super Writable>>(); - this.baseDynamicCommitters = new HashMap(); - this.dynamicContexts = new HashMap(); - this.dynamicObjectInspectors = new HashMap(); - this.dynamicOutputJobInfo = new HashMap(); - } } /** @@ -130,138 +104,62 @@ public HiveStorageHandler getStorageHandler() { return storageHandler; } - @Override - public void close(TaskAttemptContext context) throws IOException, - InterruptedException { - Reporter reporter = InternalUtil.createReporter(context); - if (dynamicPartitioningUsed) { - for (org.apache.hadoop.mapred.RecordWriter, ? super Writable> bwriter : baseDynamicWriters.values()) { - //We are in RecordWriter.close() make sense that the context would be TaskInputOutput - bwriter.close(reporter); - } - for (Map.Entry entry : baseDynamicCommitters.entrySet()) { - org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey()); - OutputCommitter baseOutputCommitter = entry.getValue(); - if (baseOutputCommitter.needsTaskCommit(currContext)) { - baseOutputCommitter.commitTask(currContext); - } - } - } else { - getBaseRecordWriter().close(reporter); - } - } + abstract protected LocalFileWriter getLocalFileWriter(HCatRecord value) + throws IOException, HCatException; @Override - public void write(WritableComparable key, HCatRecord value) throws IOException, - InterruptedException { - - org.apache.hadoop.mapred.RecordWriter localWriter; - ObjectInspector localObjectInspector; - SerDe localSerDe; - OutputJobInfo localJobInfo = null; - - if (dynamicPartitioningUsed) { - // calculate which writer to use from the remaining values - this needs to be done before we delete cols - List dynamicPartValues = new ArrayList(); - for (Integer colToAppend : dynamicPartCols) { - dynamicPartValues.add(value.get(colToAppend).toString()); - } - - String dynKey = dynamicPartValues.toString(); - if (!baseDynamicWriters.containsKey(dynKey)) { - if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)) { - throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, - "Number of dynamic partitions being created " - + "exceeds configured max allowable partitions[" - + maxDynamicPartitions - + "], increase parameter [" - + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname - + "] if needed."); - } - - org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = HCatMapRedUtil.createTaskAttemptContext(context); - configureDynamicStorageHandler(currTaskContext, dynamicPartValues); - localJobInfo = HCatBaseOutputFormat.getJobInfo(currTaskContext.getConfiguration()); - - //setup serDe - SerDe currSerDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), currTaskContext.getJobConf()); - try { - InternalUtil.initializeOutputSerDe(currSerDe, currTaskContext.getConfiguration(), localJobInfo); - } catch (SerDeException e) { - throw new IOException("Failed to initialize SerDe", e); - } - - //create base OutputFormat - org.apache.hadoop.mapred.OutputFormat baseOF = - ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf()); - - //We are skipping calling checkOutputSpecs() for each partition - //As it can throw a FileAlreadyExistsException when more than one mapper is writing to a partition - //See HCATALOG-490, also to avoid contacting the namenode for each new FileOutputFormat instance - //In general this should be ok for most FileOutputFormat implementations - //but may become an issue for cases when the method is used to perform other setup tasks - - //get Output Committer - org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = currTaskContext.getJobConf().getOutputCommitter(); - //create currJobContext the latest so it gets all the config changes - org.apache.hadoop.mapred.JobContext currJobContext = HCatMapRedUtil.createJobContext(currTaskContext); - //setupJob() - baseOutputCommitter.setupJob(currJobContext); - //recreate to refresh jobConf of currTask context - currTaskContext = - HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(), - currTaskContext.getTaskAttemptID(), - currTaskContext.getProgressible()); - //set temp location - currTaskContext.getConfiguration().set("mapred.work.output.dir", - new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext).getWorkPath().toString()); - //setupTask() - baseOutputCommitter.setupTask(currTaskContext); - - Path parentDir = new Path(currTaskContext.getConfiguration().get("mapred.work.output.dir")); - Path childPath = new Path(parentDir,FileOutputFormat.getUniqueFile(currTaskContext, "part", "")); - - org.apache.hadoop.mapred.RecordWriter baseRecordWriter = - baseOF.getRecordWriter( - parentDir.getFileSystem(currTaskContext.getConfiguration()), - currTaskContext.getJobConf(), - childPath.toString(), - InternalUtil.createReporter(currTaskContext)); - - baseDynamicWriters.put(dynKey, baseRecordWriter); - baseDynamicSerDe.put(dynKey, currSerDe); - baseDynamicCommitters.put(dynKey, baseOutputCommitter); - dynamicContexts.put(dynKey, currTaskContext); - dynamicObjectInspectors.put(dynKey, InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema())); - dynamicOutputJobInfo.put(dynKey, HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey).getConfiguration())); - } - - localJobInfo = dynamicOutputJobInfo.get(dynKey); - localWriter = baseDynamicWriters.get(dynKey); - localSerDe = baseDynamicSerDe.get(dynKey); - localObjectInspector = dynamicObjectInspectors.get(dynKey); - } else { - localJobInfo = jobInfo; - localWriter = getBaseRecordWriter(); - localSerDe = serDe; - localObjectInspector = objectInspector; - } + public void write(WritableComparable key, HCatRecord value) + throws IOException, InterruptedException { + LocalFileWriter localFileWriter = getLocalFileWriter(value); + RecordWriter localWriter = localFileWriter.getLocalWriter(); + ObjectInspector localObjectInspector = + localFileWriter.getLocalObjectInspector(); + SerDe localSerDe = localFileWriter.getLocalSerDe(); + OutputJobInfo localJobInfo = localFileWriter.getLocalJobInfo(); for (Integer colToDel : partColsToDel) { value.remove(colToDel); } - - //The key given by user is ignored + // The key given by user is ignored try { - localWriter.write(NullWritable.get(), localSerDe.serialize(value.getAll(), localObjectInspector)); + localWriter.write( + NullWritable.get(), + localSerDe.serialize(value.getAll(), localObjectInspector)); } catch (SerDeException e) { throw new IOException("Failed to serialize object", e); } } - protected void configureDynamicStorageHandler(JobContext context, List dynamicPartVals) throws IOException { - HCatOutputFormat.configureOutputStorageHandler(context, dynamicPartVals); - } + class LocalFileWriter { + private RecordWriter localWriter; + private ObjectInspector localObjectInspector; + private SerDe localSerDe; + private OutputJobInfo localJobInfo; + + public LocalFileWriter(RecordWriter localWriter, + ObjectInspector localObjectInspector, SerDe localSerDe, + OutputJobInfo localJobInfo) { + this.localWriter = localWriter; + this.localObjectInspector = localObjectInspector; + this.localSerDe = localSerDe; + this.localJobInfo = localJobInfo; + } + public RecordWriter getLocalWriter() { + return localWriter; + } + + public ObjectInspector getLocalObjectInspector() { + return localObjectInspector; + } + + public SerDe getLocalSerDe() { + return localSerDe; + } + + public OutputJobInfo getLocalJobInfo() { + return localJobInfo; + } + } } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticFileRecordWriterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticFileRecordWriterContainer.java new file mode 100644 index 0000000..4786706 --- /dev/null +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticFileRecordWriterContainer.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hive.hcatalog.mapreduce.FileRecordWriterContainer; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.HCatRecord; + +/** + * Part of the FileOutput*Container classes + * See {@link FileOutputFormatContainer} for more information + */ +class StaticFileRecordWriterContainer extends FileRecordWriterContainer { + /** + * @param baseWriter RecordWriter to contain + * @param context current TaskAttemptContext + * @throws IOException + * @throws InterruptedException + */ + public StaticFileRecordWriterContainer( + RecordWriter< + ? super WritableComparable, + ? super Writable> baseWriter, + TaskAttemptContext context) throws IOException, InterruptedException { + super(baseWriter, context); + } + + @Override + public void close(TaskAttemptContext context) + throws IOException, InterruptedException { + Reporter reporter = InternalUtil.createReporter(context); + getBaseRecordWriter().close(reporter); + } + + @Override + protected LocalFileWriter getLocalFileWriter(HCatRecord value) + throws IOException, HCatException { + return new LocalFileWriter( + getBaseRecordWriter(), objectInspector, serDe, jobInfo); + } +} -- 1.8.3.4 (Apple Git-47)