diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java index 288b7a3..334a1ea 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java @@ -23,7 +23,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.HiveMetaHook; -import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -101,7 +100,6 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { @Override public void configureInputJobProperties(TableDesc tableDesc, Map jobProperties) { - } @Override @@ -156,10 +154,9 @@ public void configureOutputJobProperties(TableDesc tableDesc, jobProperties.put("mapred.output.dir", jobInfo.getLocation()); } - //TODO find a better home for this, RCFile specifc - jobProperties.put(RCFile.COLUMN_NUMBER_CONF_STR, - Integer.toOctalString( - jobInfo.getOutputSchema().getFields().size())); + SpecialCases.addSpecialCasesParametersToOutputJobProperties(jobProperties, jobInfo, ofClass); + + jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo)); } catch (IOException e) { diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java new file mode 100644 index 0000000..4d31866 --- /dev/null +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/SpecialCases.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.mapreduce; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.io.RCFile; +import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.mapred.OutputFormat; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * This class is a place to put all the code associated with + * Special cases. If there is a corner case required to make + * a particular format work that is above and beyond the generic + * use, it belongs here, for example. Over time, the goal is to + * try to minimize usage of this, but it is a useful overflow + * class that allows us to still be as generic as possible + * in the main codeflow path, and call attention to the special + * cases here. + * Note : For all methods introduced here, please document why + * the special case is necessary, providing a jira number if + * possible. + */ +public class SpecialCases { + + static final private Log LOG = LogFactory.getLog(SpecialCases.class); + + // Orc-specific parameter definitions + private final static List orcTablePropsToCopy = Arrays.asList( + OrcFile.STRIPE_SIZE, + OrcFile.COMPRESSION, + OrcFile.COMPRESSION_BLOCK_SIZE, + OrcFile.ROW_INDEX_STRIDE, + OrcFile.ENABLE_INDEXES, + OrcFile.BLOCK_PADDING + ); + + public static void addSpecialCasesParametersToOutputJobProperties( + Map jobProperties, + OutputJobInfo jobInfo, Class ofclass) { + if (ofclass == RCFileOutputFormat.class) { + // RCFile specific parameter + jobProperties.put(RCFile.COLUMN_NUMBER_CONF_STR, + Integer.toOctalString( + jobInfo.getOutputSchema().getFields().size())); + } else if (ofclass == OrcOutputFormat.class) { + // Special cases for ORC + // We need to check table properties to see if a couple of parameters, + // such as compression parameters are defined. If they are, then we copy + // them to job properties, so that it will be available in jobconf at runtime + // See HIVE-5504 for details + Map tableProps = jobInfo.getTableInfo().getTable().getParameters(); + for (String s : orcTablePropsToCopy){ + if (tableProps.containsKey(s)){ + jobProperties.put(s,tableProps.get(s)); + } + } + } + } + + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index 62e7b34..b61aaec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -106,58 +106,71 @@ public SerDeStats getStats() { } } - @Override - public RecordWriter - getRecordWriter(FileSystem fileSystem, JobConf conf, String name, - Progressable reporter) throws IOException { - return new - OrcRecordWriter(new Path(name), OrcFile.writerOptions(conf)); + /** + * Helper method to get a parameter first from props if present, falling back to JobConf if not. + * Returns null if key is present in neither. + */ + private String getSettingFromPropsFallingBackToConf(String key, Properties props, JobConf conf){ + if ((props != null) && props.containsKey(key)){ + return props.getProperty(key); + } else if(conf != null) { + // If conf is not null, and the key is not present, Configuration.get() will + // return null for us. So, we don't have to check if it contains it. + return conf.get(key); + } else { + return null; + } } - @Override - public FSRecordWriter - getHiveRecordWriter(JobConf conf, - Path path, - Class valueClass, - boolean isCompressed, - Properties tableProperties, - Progressable reporter) throws IOException { + private OrcFile.WriterOptions getOptions(JobConf conf, Properties props) { OrcFile.WriterOptions options = OrcFile.writerOptions(conf); - if (tableProperties.containsKey(OrcFile.STRIPE_SIZE)) { - options.stripeSize(Long.parseLong - (tableProperties.getProperty(OrcFile.STRIPE_SIZE))); + String s ; + if ((s = getSettingFromPropsFallingBackToConf(OrcFile.STRIPE_SIZE,props,conf)) != null){ + options.stripeSize(Long.parseLong(s)); } - if (tableProperties.containsKey(OrcFile.COMPRESSION)) { - options.compress(CompressionKind.valueOf - (tableProperties.getProperty(OrcFile.COMPRESSION))); + if ((s = getSettingFromPropsFallingBackToConf(OrcFile.COMPRESSION,props,conf)) != null){ + options.compress(CompressionKind.valueOf(s)); } - if (tableProperties.containsKey(OrcFile.COMPRESSION_BLOCK_SIZE)) { - options.bufferSize(Integer.parseInt - (tableProperties.getProperty - (OrcFile.COMPRESSION_BLOCK_SIZE))); + if ((s = getSettingFromPropsFallingBackToConf(OrcFile.COMPRESSION_BLOCK_SIZE,props,conf)) != null){ + options.bufferSize(Integer.parseInt(s)); } - if (tableProperties.containsKey(OrcFile.ROW_INDEX_STRIDE)) { - options.rowIndexStride(Integer.parseInt - (tableProperties.getProperty - (OrcFile.ROW_INDEX_STRIDE))); + if ((s = getSettingFromPropsFallingBackToConf(OrcFile.ROW_INDEX_STRIDE,props,conf)) != null){ + options.rowIndexStride(Integer.parseInt(s)); } - if (tableProperties.containsKey(OrcFile.ENABLE_INDEXES)) { - if ("false".equals(tableProperties.getProperty - (OrcFile.ENABLE_INDEXES))) { + if ((s = getSettingFromPropsFallingBackToConf(OrcFile.ENABLE_INDEXES,props,conf)) != null){ + if ("false".equalsIgnoreCase(s)) { options.rowIndexStride(0); } } - if (tableProperties.containsKey(OrcFile.BLOCK_PADDING)) { - options.blockPadding(Boolean.parseBoolean - (tableProperties.getProperty - (OrcFile.BLOCK_PADDING))); + if ((s = getSettingFromPropsFallingBackToConf(OrcFile.BLOCK_PADDING,props,conf)) != null){ + options.blockPadding(Boolean.parseBoolean(s)); } - return new OrcRecordWriter(path, options); + return options; + } + + @Override + public RecordWriter + getRecordWriter(FileSystem fileSystem, JobConf conf, String name, + Progressable reporter) throws IOException { + return new + OrcRecordWriter(new Path(name), getOptions(conf,null)); + } + + + @Override + public FSRecordWriter + getHiveRecordWriter(JobConf conf, + Path path, + Class valueClass, + boolean isCompressed, + Properties tableProperties, + Progressable reporter) throws IOException { + return new OrcRecordWriter(path, getOptions(conf,tableProperties)); } }