diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java index 2e2a80e..b856522 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -101,13 +102,24 @@ public void write(Writable w) throws IOException { } @Override + public void setConf(Configuration conf) { + ensureTableNameInConf(conf); + super.setConf(conf); + } + + // TableOutputFormat.setConf complains if OUTPUT_TABLE is not set, but we have + // a different parameter name for it. Copy it in. + private void ensureTableNameInConf(Configuration conf){ + conf.set(TableOutputFormat.OUTPUT_TABLE, conf.get(HBaseSerDe.HBASE_TABLE_NAME)); + } + + @Override public void checkOutputSpecs(FileSystem fs, JobConf jc) throws IOException { //obtain delegation tokens for the job TableMapReduceUtil.initCredentials(jc); - String hbaseTableName = jc.get(HBaseSerDe.HBASE_TABLE_NAME); - jc.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName); + ensureTableNameInConf(jc); Job job = new Job(jc); JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job); diff --git a/hcatalog/core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java b/hcatalog/core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java index 663b6c9..ae9878a 100644 --- a/hcatalog/core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java +++ b/hcatalog/core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java @@ -202,7 +202,7 @@ public void postAnalyze(HiveSemanticAnalyzerHookContext context, desc.getSerName(), desc.getInputFormat(), desc.getOutputFormat()); - //Authorization checks are performed by the storageHandler.getAuthorizationProvider(), if + //Authorization checks are performed by the storageHandler.getAuthorizationProvider(), if //StorageDelegationAuthorizationProvider is used. } catch (IOException e) { throw new SemanticException(e); diff --git a/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatUtil.java b/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatUtil.java index 6447b22..03441a0 100644 --- a/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatUtil.java +++ b/hcatalog/core/src/main/java/org/apache/hcatalog/common/HCatUtil.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -68,6 +69,7 @@ import org.apache.hcatalog.mapreduce.OutputJobInfo; import org.apache.hcatalog.mapreduce.PartInfo; import org.apache.hcatalog.mapreduce.StorerInfo; +import org.apache.hcatalog.mapreduce.WrapperStorageHandler; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -420,12 +422,20 @@ public static HCatStorageHandler getStorageHandler(Configuration conf, } try { - Class handlerClass = - (Class) Class - .forName(storageHandler, true, JavaUtils.getClassLoader()); - return (HCatStorageHandler) ReflectionUtils.newInstance( - handlerClass, conf); - } catch (ClassNotFoundException e) { + Class handlerClass = + (Class) Class.forName( + storageHandler, true, JavaUtils.getClassLoader()); + if (HCatStorageHandler.class.isAssignableFrom(handlerClass)){ + HCatStorageHandler wrapperStorageHandler = + (HCatStorageHandler) ReflectionUtils.newInstance(handlerClass,conf); + wrapperStorageHandler.setConf(conf); + return wrapperStorageHandler; + } else { + return new WrapperStorageHandler(handlerClass, conf); + } + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(System.err); throw new IOException("Error in loading storage handler." + e.getMessage(), e); } @@ -462,6 +472,14 @@ public static HCatStorageHandler getStorageHandler(Configuration conf, storageHandler.configureInputJobProperties(tableDesc, jobProperties); + + // Copy any other conf parameters that the storage handler additionally set + // but wasn't present in the tableDesc.getJobProperties() we passed in. + for (Map.Entry el : storageHandler.getConf()) { + if (! tableDesc.getJobProperties().containsKey(el.getKey())){ + jobProperties.put(el.getKey(), el.getValue()); + } + } } catch (IOException e) { throw new IllegalStateException( @@ -483,8 +501,9 @@ public static HCatStorageHandler getStorageHandler(Configuration conf, storageHandler.getInputFormatClass(), IgnoreKeyTextOutputFormat.class, outputJobInfo.getTableInfo().getStorerInfo().getProperties()); - if (tableDesc.getJobProperties() == null) + if (tableDesc.getJobProperties() == null) { tableDesc.setJobProperties(new HashMap()); + } for (Map.Entry el : conf) { tableDesc.getJobProperties().put(el.getKey(), el.getValue()); } @@ -501,6 +520,11 @@ public static HCatStorageHandler getStorageHandler(Configuration conf, for (Map.Entry el : jobProperties.entrySet()) { conf.set(el.getKey(), el.getValue()); } + for (Map.Entry el : storageHandler.getConf()) { + if (! tableDesc.getJobProperties().containsKey(el.getKey())){ + conf.set(el.getKey(), el.getValue()); + } + } } catch (IOException e) { throw new IllegalStateException( "Failed to configure StorageHandler", e); @@ -549,8 +573,9 @@ public static HiveMetaStoreClient getHiveClient(HiveConf hiveConf) public static void closeHiveClientQuietly(HiveMetaStoreClient client) { try { - if (client != null) + if (client != null) { client.close(); + } } catch (Exception e) { LOG.debug("Error closing metastore client. Ignored the error.", e); } @@ -616,12 +641,13 @@ public static void copyJobPropertiesToJobConf( jobConf.set(entry.getKey(), entry.getValue()); } } - + public static boolean isHadoop23() { String version = org.apache.hadoop.util.VersionInfo.getVersion(); - if (version.matches("\\b0\\.23\\..+\\b")||version.matches("\\b2\\..*")) + if (version.matches("\\b0\\.23\\..+\\b")||version.matches("\\b2\\..*")) { return true; + } return false; } } diff --git a/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java b/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java index 2c630b6..3a3a65f 100644 --- a/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java @@ -19,9 +19,12 @@ package org.apache.hcatalog.mapreduce; +import java.io.IOException; +import java.text.NumberFormat; + +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; - import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -30,9 +33,6 @@ import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; -import java.io.IOException; -import java.text.NumberFormat; - /** * Bare bones implementation of OutputFormatContainer. Does only the required * tasks to work properly with HCatalog. HCatalog features which require a @@ -41,6 +41,7 @@ class DefaultOutputFormatContainer extends OutputFormatContainer { private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); + private boolean isHiveOutputFormat = true; static { NUMBER_FORMAT.setMinimumIntegerDigits(5); @@ -49,6 +50,31 @@ public DefaultOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat, Writable> of) { super(of); + if ( + isHBaseHCatStorageHandlerOutputformat(of.getClass()) + || (!HiveOutputFormat.class.isAssignableFrom(of.getClass())) + ){ + // The HBaseHCatStorageHandler OutputFormats are HiveOutputFormats too, but + // throw runtime errors if getHiveRecordWriter() is called on them - they + // should be treated strictly as a non-HiveOutputFormat + isHiveOutputFormat = false; + } + } + + + /** + * Determine if outputformat class is a HBaseHCatStorageHandler class + * Required to determine if it is safe to call .getHiveRecordWriter + * We're not able to use instanceof or isAssignableFrom because + * that module does not yet exist when this is compiled, and is on + * a deprecation path + */ + private boolean isHBaseHCatStorageHandlerOutputformat(Class ofc) { + String ofcName = ofc.getName(); + return ( ofcName.equals("org.apache.hcatalog.hbase.HBaseBaseOutputFormat") + || ofcName.equals("org.apache.hcatalog.hbase.HBaseBulkOutputFormat") + || ofcName.equals("org.apache.hcatalog.hbase.HBaseDirectOutputFormat") + ); } static synchronized String getOutputName(int partition) { @@ -66,8 +92,21 @@ static synchronized String getOutputName(int partition) { public RecordWriter, HCatRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { String name = getOutputName(context.getTaskAttemptID().getTaskID().getId()); - return new DefaultRecordWriterContainer(context, - getBaseOutputFormat().getRecordWriter(null, new JobConf(context.getConfiguration()), name, InternalUtil.createReporter(context))); + JobConf jc = new JobConf(context.getConfiguration()); + if (!isHiveOutputFormat){ + return new DefaultRecordWriterContainer(context, + getBaseOutputFormat().getRecordWriter( + null, jc, name, InternalUtil.createReporter(context))); + } else { + WrapperRecordWriter rwc = new WrapperRecordWriter( + ((HiveOutputFormat)getBaseOutputFormat()).getHiveRecordWriter( + jc, null, null, false, null, null) + ); + // Going with default null args for finalOutputPath, + // valueClass, tableProperties & progress, and false for isCompressed + // override method for more specific storagehandlers + return new DefaultRecordWriterContainer(context,rwc); + } } diff --git a/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/PartInfo.java b/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/PartInfo.java index 0042a0e..ae118d0 100644 --- a/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/PartInfo.java +++ b/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/PartInfo.java @@ -72,7 +72,13 @@ public PartInfo(HCatSchema partitionSchema, HCatStorageHandler storageHandler, this.jobProperties = jobProperties; this.tableInfo = tableInfo; - this.storageHandlerClassName = storageHandler.getClass().getName(); + if (storageHandler instanceof WrapperStorageHandler){ + this.storageHandlerClassName = + ((WrapperStorageHandler) storageHandler) + .getUnderlyingHiveStorageHandler().getClass().getName(); + } else { + this.storageHandlerClassName = storageHandler.getClass().getName(); + } this.inputFormatClassName = storageHandler.getInputFormatClass().getName(); this.serdeClassName = storageHandler.getSerDeClass().getName(); this.outputFormatClassName = storageHandler.getOutputFormatClass().getName(); diff --git a/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/WrapperRecordWriter.java b/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/WrapperRecordWriter.java new file mode 100644 index 0000000..ecfeeb5 --- /dev/null +++ b/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/WrapperRecordWriter.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; + +/** + * + * WrapperRecordWriterContainer wraps a HiveRecordWriter and implements a mapred.RecordWriter + * so that a RecordWriterContainer can contain it. + * + */ + +class WrapperRecordWriter implements RecordWriter { + + org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter hrw = null; + + public WrapperRecordWriter( + org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter hiveRecordWriter) { + this.hrw = hiveRecordWriter; + } + + @Override + public void close(Reporter arg0) throws IOException { + // no reporter gets passed in the HiveRecordWriter, + // only an abort param, which we default to false + hrw.close(false); + } + + @Override + public void write(Object key, Object value) throws IOException { + // key is ignored, value is the writable record to be written + hrw.write((Writable) value); + } + +} diff --git a/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/WrapperStorageHandler.java b/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/WrapperStorageHandler.java new file mode 100644 index 0000000..5d23f6e --- /dev/null +++ b/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/WrapperStorageHandler.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hcatalog.common.HCatConstants; + +/** + * This class is used to encapsulate an underlying HiveStorageHandler and do + * o.a.mapred->o.a.mapreduce conversions so HCatalog can use them. + */ +public class WrapperStorageHandler extends HCatStorageHandler { + + private final Class handlerClass; + private final HiveStorageHandler handler; + private boolean isHiveOutputFormat = true; + private Class outputFormatClass = null; + + public WrapperStorageHandler(Class handlerClass, + Configuration conf) throws Exception{ + this.handlerClass = handlerClass; + this.handler = handlerClass.newInstance(); + if (conf != null){ + this.handler.setConf(conf); + } + } + + public WrapperStorageHandler(String handlerClassName) throws Exception { + this((Class) Class.forName(handlerClassName),null); + } + + public HiveStorageHandler getUnderlyingHiveStorageHandler() { + return this.handler; + } + + @Override + public Class getInputFormatClass() { + return handler.getInputFormatClass(); + } + + @Override + public Class getOutputFormatClass() { + if (outputFormatClass == null){ + // determines if outputformat is a HiveOutputFormat - + // if it isn't, we find the appropriate substitute + outputFormatClass = this.handler.getOutputFormatClass(); + if (!HiveOutputFormat.class.isAssignableFrom(outputFormatClass)){ + isHiveOutputFormat = false; + outputFormatClass = HiveFileFormatUtils.getOutputFormatSubstitute(outputFormatClass); + } + } + return outputFormatClass; + } + + @Override + public Class getSerDeClass() { + return handler.getSerDeClass(); + } + + @Override + public HiveMetaHook getMetaHook() { + return handler.getMetaHook(); + } + + @Override + public void configureInputJobProperties(TableDesc tableDesc, + Map jobProperties) { + String hcatInputInfo = tableDesc.getJobProperties().get(HCatConstants.HCAT_KEY_JOB_INFO); + handler.configureInputJobProperties(tableDesc, jobProperties); + if (!jobProperties.containsKey(HCatConstants.HCAT_KEY_JOB_INFO)){ + // If the underlying storage handler didn't copy the hcat input job info, we do + jobProperties.put(HCatConstants.HCAT_KEY_JOB_INFO, hcatInputInfo); + } + } + + @Override + public void configureOutputJobProperties(TableDesc tableDesc, + Map jobProperties) { + String hcatOutputInfo = tableDesc.getJobProperties().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); + handler.configureOutputJobProperties(tableDesc, jobProperties); + if (!jobProperties.containsKey(HCatConstants.HCAT_KEY_OUTPUT_INFO)){ + // If the underlying storage handler didn't copy the hcat output job info, we do + jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, hcatOutputInfo); + } + } + + @Override + OutputFormatContainer getOutputFormatContainer( + org.apache.hadoop.mapred.OutputFormat outputFormat) { + return new DefaultOutputFormatContainer(outputFormat); + } + + @Override + public Configuration getConf() { + return handler.getConf(); + } + + @Override + public void setConf(Configuration conf) { + handler.setConf(conf); + } + + @Override + public HiveAuthorizationProvider getAuthorizationProvider() + throws HiveException { + return handler.getAuthorizationProvider(); + } + +}