Index: src/test/org/apache/hcatalog/cli/DummyStorageHandler.java =================================================================== --- src/test/org/apache/hcatalog/cli/DummyStorageHandler.java (revision 1300322) +++ src/test/org/apache/hcatalog/cli/DummyStorageHandler.java (working copy) @@ -82,7 +82,7 @@ } @Override - public void configureInputJobProperties(TableDesc tableDesc, Map jobProperties) { + public void configureInputJobProperties(TableDesc tableDesc) { } @Override Index: src/java/org/apache/hcatalog/storagehandler/HCatStorageHandlerImpl.java =================================================================== --- src/java/org/apache/hcatalog/storagehandler/HCatStorageHandlerImpl.java (revision 1300322) +++ src/java/org/apache/hcatalog/storagehandler/HCatStorageHandlerImpl.java (working copy) @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hcatalog.storagehandler; - -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.logging.Logger; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.HiveMetaHook; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.io.RCFileInputFormat; -import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; -import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; -import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.OutputFormat; -import org.apache.hcatalog.common.HCatConstants; -import org.apache.hcatalog.common.HCatUtil; -import org.apache.hcatalog.data.HCatRecordSerDe; -import org.apache.hcatalog.mapred.HCatMapredInputFormat; -import org.apache.hcatalog.mapred.HCatMapredOutputFormat; -import org.apache.hcatalog.mapreduce.HCatInputStorageDriver; -import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver; -import org.apache.hcatalog.mapreduce.HCatStorageHandler; -import org.apache.hcatalog.mapreduce.OutputJobInfo; - -/** - * This is a broken class and should be removed as - * part of HCATALOG-237 - */ -@Deprecated -public class HCatStorageHandlerImpl extends HCatStorageHandler { - - Class isd; - Class osd; - - Log LOG = LogFactory.getLog(HCatStorageHandlerImpl.class); - - public Class getInputStorageDriver() { - return isd; - } - - public Class getOutputStorageDriver() { - return osd; - } - - @Override - public void configureInputJobProperties(TableDesc tableDesc, Map jobProperties) { - } - - @Override - public void configureOutputJobProperties(TableDesc tableDesc, Map jobProperties) { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public HiveAuthorizationProvider getAuthorizationProvider() - throws HiveException { - return new DummyHCatAuthProvider(); - } - - public void commitCreateTable(Table table) throws MetaException { - } - - @Override - public HiveMetaHook getMetaHook() { - return null; - } - -// public void configureTableJobProperties(TableDesc tableDesc, -// Map jobProperties) { -// // Information about the table and the job to be performed -// // We pass them on into the mepredif / mapredof -// -// Properties tprops = tableDesc.getProperties(); -// -// if(LOG.isDebugEnabled()){ -// LOG.debug("HCatStorageHandlerImpl configureTableJobProperties:"); -// HCatUtil.logStackTrace(LOG); -// HCatUtil.logMap(LOG, "jobProperties", jobProperties); -// if (tprops!= null){ -// HCatUtil.logEntrySet(LOG, "tableprops", tprops.entrySet()); -// } -// LOG.debug("tablename : "+tableDesc.getTableName()); -// } -// -// // copy existing table props first -// for (Entry e : tprops.entrySet()){ -// jobProperties.put((String)e.getKey(), (String)e.getValue()); -// } -// -// // try to set input format related properties -// try { -// HCatMapredInputFormat.setTableDesc(tableDesc,jobProperties); -// } catch (IOException ioe){ -// // ok, things are probably not going to work, but we -// // can't throw out exceptions per interface. So, we log. -// LOG.error("HCatInputFormat init fail " + ioe.getMessage()); -// LOG.error(ioe.getStackTrace()); -// } -// -// // try to set output format related properties -// try { -// HCatMapredOutputFormat.setTableDesc(tableDesc,jobProperties); -// } catch (IOException ioe){ -// // ok, things are probably not going to work, but we -// // can't throw out exceptions per interface. So, we log. -// LOG.error("HCatOutputFormat init fail " + ioe.getMessage()); -// LOG.error(ioe.getStackTrace()); -// } -// } - - @Override - public Configuration getConf() { - return null; - } - - @Override - public void setConf(Configuration conf) { - } - - @Override - public Class getSerDeClass() { - return HCatRecordSerDe.class; - } - - @Override - public final Class getInputFormatClass() { - return HCatMapredInputFormat.class; - } - - @Override - public final Class getOutputFormatClass() { - return HCatMapredOutputFormat.class; - } - -} Index: src/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java (revision 1300322) +++ src/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java (working copy) @@ -40,18 +40,19 @@ * maybe be needed by the handler's bundled artifacts (ie InputFormat, SerDe, etc). * Key value pairs passed into jobProperties is guaranteed to be set in the job's * configuration object. User's can retrieve "context" information from tableDesc. - * User's should avoid mutating tableDesc and only make changes in jobProperties. + * Users are expected to change tableDesc.getJobProperties() to pass along any info + * that they wish to be present in JobConf on the backend. * This method is expected to be idempotent such that a job called with the - * same tableDesc values should return the same key-value pairs in jobProperties. + * same tableDesc values should return the same key-value pairs in its jobProperties. * Any external state set by this method should remain the same if this method is * called again. It is up to the user to determine how best guarantee this invariant. * * This method in particular is to create a configuration for input. * @param tableDesc - * @param jobProperties */ - public abstract void configureInputJobProperties(TableDesc tableDesc, Map jobProperties); + public abstract void configureInputJobProperties(TableDesc tableDesc); + //TODO move this to HiveStorageHandler /** * This method is called to allow the StorageHandlers the chance Index: src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java (revision 1300322) +++ src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java (working copy) @@ -23,7 +23,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; /** The class used to serialize and store the information read from the metadata server */ public class InputJobInfo implements Serializable{ @@ -44,9 +43,6 @@ /** The list of partitions matching the filter. */ private List partitions; - /** implementation specific job properties */ - private Properties properties; - /** job properties */ private Map jobProperties; @@ -72,7 +68,7 @@ MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName; this.tableName = tableName; this.filter = filter; - this.properties = new Properties(); + this.jobProperties = new HashMap(); } /** @@ -132,11 +128,17 @@ } /** - * Set/Get Property information to be passed down to *StorageDriver implementation - * put implementation specific storage driver configurations here - * @return the implementation specific job properties + * @return the jobProperties */ - public Properties getProperties() { - return properties; + public Map getJobProperties() { + return jobProperties; } + + + /** + * @param jobProperties the jobProperties to set + */ + public void setJobProperties(Map jobProperties) { + this.jobProperties = jobProperties; + } } Index: src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java (revision 1300322) +++ src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java (working copy) @@ -95,9 +95,8 @@ } @Override - public void configureInputJobProperties(TableDesc tableDesc, - Map jobProperties) { - + public void configureInputJobProperties(TableDesc tableDesc) { + // noop impl for configureInputJobProperties } @Override Index: src/java/org/apache/hcatalog/mapreduce/InitializeInput.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (revision 1300322) +++ src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (working copy) @@ -112,6 +112,13 @@ List partInfoList = new ArrayList(); inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table)); + + // Instantiate storage handler and job properties for storage handler + StorerInfo storerInfo = InternalUtil.extractStorerInfo(table.getSd(),table.getParameters()); + HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(job.getConfiguration(), + storerInfo); + HCatUtil.configureInputJobProperties( storageHandler, inputJobInfo); + if( table.getPartitionKeys().size() != 0 ) { //Partitioned table List parts = client.listPartitionsByFilter(inputJobInfo.getDatabaseName(), @@ -124,12 +131,12 @@ if (parts != null && parts.size() > maxPart) { throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART, "total number of partitions is " + parts.size()); } - + // populate partition info for (Partition ptn : parts){ PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters(), job.getConfiguration(), - inputJobInfo); + inputJobInfo,storageHandler); partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn)); partInfoList.add(partInfo); } @@ -138,7 +145,7 @@ //Non partitioned table PartInfo partInfo = extractPartInfo(table.getSd(),table.getParameters(), job.getConfiguration(), - inputJobInfo); + inputJobInfo,storageHandler); partInfo.setPartitionValues(new HashMap()); partInfoList.add(partInfo); } @@ -176,26 +183,25 @@ static PartInfo extractPartInfo(StorageDescriptor sd, Map parameters, Configuration conf, - InputJobInfo inputJobInfo) throws IOException{ + InputJobInfo inputJobInfo, HCatStorageHandler storageHandler) throws IOException{ HCatSchema schema = HCatUtil.extractSchemaFromStorageDescriptor(sd); - StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd,parameters); - + Properties hcatProperties = new Properties(); - HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf, - storerInfo); + // copy the properties from storageHandler to jobProperties - MapjobProperties = HCatUtil.getInputJobProperties( - storageHandler, - inputJobInfo); - + MapjobProperties = inputJobInfo.getJobProperties(); + for (String key : parameters.keySet()){ if (key.startsWith(HCAT_KEY_PREFIX)){ hcatProperties.put(key, parameters.get(key)); } } + // FIXME - // Bloating partinfo with inputJobInfo is not good + // Bloating partinfo with InputJobInfo, TableInfo is not good + // We need to verify that inputJobInfo is indeed available at getRR time, + // and if so, not have that part of PartInfo. return new PartInfo(schema, storageHandler, sd.getLocation(), hcatProperties, jobProperties, inputJobInfo.getTableInfo()); Index: src/java/org/apache/hcatalog/common/HCatUtil.java =================================================================== --- src/java/org/apache/hcatalog/common/HCatUtil.java (revision 1300322) +++ src/java/org/apache/hcatalog/common/HCatUtil.java (working copy) @@ -542,32 +542,34 @@ } } - public static Map - getInputJobProperties(HCatStorageHandler storageHandler, + public static void + configureInputJobProperties(HCatStorageHandler storageHandler, InputJobInfo inputJobInfo) { TableDesc tableDesc = new TableDesc(storageHandler.getSerDeClass(), storageHandler.getInputFormatClass(), storageHandler.getOutputFormatClass(), inputJobInfo.getTableInfo().getStorerInfo().getProperties()); - if(tableDesc.getJobProperties() == null) { - tableDesc.setJobProperties(new HashMap()); + + if(inputJobInfo.getJobProperties() == null) { + inputJobInfo.setJobProperties(new HashMap()); } + tableDesc.setJobProperties(inputJobInfo.getJobProperties()); - Map jobProperties = new HashMap(); + // CONCERN : TableDesc.getJobProperties needs a serialized InputJobInfo for now + // but InputJobInfo.getJobProperties is the same as TableDesc.getJobProperties + // This will work, but is kludgy - it should go away when we do away with + // InputJobInfo at some time in favour of TableDesc + try { tableDesc.getJobProperties().put( HCatConstants.HCAT_KEY_JOB_INFO, HCatUtil.serialize(inputJobInfo)); + storageHandler.configureInputJobProperties(tableDesc); - storageHandler.configureInputJobProperties(tableDesc, - jobProperties); - } catch (IOException e) { throw new IllegalStateException( "Failed to configure StorageHandler",e); } - - return jobProperties; } Index: src/java/org/apache/hcatalog/mapred/HCatMapredInputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapred/HCatMapredInputFormat.java (revision 1300322) +++ src/java/org/apache/hcatalog/mapred/HCatMapredInputFormat.java (working copy) @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hcatalog.mapred; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Collection; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.io.RCFileInputFormat; -import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hcatalog.mapreduce.HCatSplit; -import org.apache.hcatalog.common.HCatConstants; -import org.apache.hcatalog.common.HCatUtil; -import org.apache.hcatalog.data.HCatRecord; -import org.apache.hcatalog.data.Pair; -import org.apache.hcatalog.mapreduce.HCatInputFormat; -import org.apache.hcatalog.mapreduce.InitializeInput; -import org.apache.hcatalog.mapreduce.InputJobInfo; -import org.apache.hcatalog.shims.HCatHadoopShims; -import org.apache.hcatalog.storagehandler.HCatStorageHandlerImpl; -import org.apache.hadoop.hive.ql.plan.TableDesc; - - - -public class HCatMapredInputFormat implements InputFormat { - - - private static final Log LOG = LogFactory.getLog(HCatMapredInputFormat.class); - HCatInputFormat hci; - - public HCatMapredInputFormat(){ - hci = new HCatInputFormat(); - } - - @Override - public RecordReader getRecordReader(InputSplit split, JobConf job, - Reporter arg2) throws IOException { - try { - org.apache.hadoop.mapreduce.RecordReader rr; - TaskAttemptContext taContext - = HCatHadoopShims.Instance.get().createTaskAttemptContext(job, new TaskAttemptID()); - rr = hci.createRecordReader(((HiveHCatSplitWrapper)split).getHCatSplit(), taContext); - rr.initialize(((HiveHCatSplitWrapper)split).getHCatSplit(),taContext); - return (RecordReader) rr; - - } catch (java.lang.InterruptedException e){ - throw new IOException(e); - } - } - - @Override - public InputSplit[] getSplits(JobConf job, int arg1) throws IOException { - - try { - List hsplits = new ArrayList(); - for (org.apache.hadoop.mapreduce.InputSplit hs : hci.getSplits( - HCatHadoopShims.Instance.get().createJobContext(job, new JobID()))){ - HiveHCatSplitWrapper hwrapper = new HiveHCatSplitWrapper((HCatSplit)hs); - - String hwrapperPath = hwrapper.getPath().toString(); - String mapredInputDir = job.get("mapred.input.dir","null"); - - if(hwrapperPath.startsWith(mapredInputDir)){ - hsplits.add(hwrapper); - } - } - InputSplit[] splits = new InputSplit[hsplits.size()]; - for (int i = 0 ; i jobProperties) throws IOException{ - try { - Pair dbAndTableName = HCatUtil.getDbAndTableName(tableDesc.getTableName()); - InputJobInfo info = InputJobInfo.create(dbAndTableName.first, dbAndTableName.second, ""); - jobProperties.put(HCatConstants.HCAT_KEY_JOB_INFO - ,InitializeInput.getSerializedHcatKeyJobInfo( - null, info,tableDesc.getProperties().getProperty("location"))); - } catch (Exception e){ - throw new IOException(e); - } - } - -} Index: src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java (revision 1300322) +++ src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java (working copy) @@ -1,252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hcatalog.mapred; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.HiveOutputFormat; -import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputFormat; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.util.Progressable; -import org.apache.hcatalog.common.HCatConstants; -import org.apache.hcatalog.common.HCatUtil; -import org.apache.hcatalog.data.HCatRecord; -import org.apache.hcatalog.data.schema.HCatSchema; -import org.apache.hcatalog.data.Pair; -import org.apache.hcatalog.data.schema.HCatSchemaUtils; -import org.apache.hcatalog.data.schema.HCatSchemaUtils.CollectionBuilder; -import org.apache.hcatalog.mapreduce.HCatOutputFormat; -import org.apache.hcatalog.mapreduce.InitializeInput; -import org.apache.hcatalog.mapreduce.InputJobInfo; -import org.apache.hcatalog.mapreduce.OutputJobInfo; -import org.apache.hcatalog.shims.HCatHadoopShims; -import org.apache.hcatalog.storagehandler.HCatStorageHandlerImpl; -import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.serde.Constants; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -public class HCatMapredOutputFormat implements OutputFormat, HiveOutputFormat { - - HCatOutputFormat hco; - private static final Log LOG = LogFactory.getLog(HCatMapredOutputFormat.class); - - public HCatMapredOutputFormat() { - LOG.debug("HCatMapredOutputFormat init"); - hco = new HCatOutputFormat(); - } - - @Override - public void checkOutputSpecs(FileSystem arg0, JobConf arg1) - throws IOException { - LOG.debug("HCatMapredOutputFormat checkOutputSpecs"); - JobContext context = HCatHadoopShims.Instance.get().createJobContext(arg1, new JobID()); - try { - hco.checkOutputSpecs(context); - } catch (InterruptedException e) { - LOG.warn(e.getMessage()); - HCatUtil.logStackTrace(LOG); - } - } - - @Override - public RecordWriter getRecordWriter(FileSystem arg0, JobConf arg1, - String arg2, Progressable arg3) throws IOException { - // this is never really called from hive, but it's part of the IF interface - - LOG.debug("HCatMapredOutputFormat getRecordWriter"); - return getRW(arg1); - } - - public HCatMapredRecordWriter getRW(Configuration arg1) throws IOException { - try { - JobContext jc = HCatHadoopShims.Instance.get().createJobContext(arg1, new JobID()); - TaskAttemptContext taContext = HCatHadoopShims.Instance.get().createTaskAttemptContext(arg1, new TaskAttemptID()); - return new HCatMapredOutputFormat.HCatMapredRecordWriter(hco,jc,taContext); - } catch (Exception e){ - throw new IOException(e); - } - } - - @Override - public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( - JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, - Properties tableProperties, Progressable progress) throws IOException { - LOG.debug("HCatMapredOutputFormat getHiveRecordWriter"); - final HCatMapredRecordWriter rw = getRW(jc); - return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() { - public void write(Writable r) throws IOException { - rw.write(null, (HCatRecord) r); - } - public void close(boolean abort) throws IOException { - rw.setAbortStatus(abort); - rw.close(null); - } - }; - - } - - public static void setTableDesc(TableDesc tableDesc, Map jobProperties) throws IOException { - setTableDesc(tableDesc,jobProperties,new LinkedHashMap()); - } - - public static void setPartitionDesc(PartitionDesc ptnDesc, Map jobProperties) throws IOException { - setTableDesc(ptnDesc.getTableDesc(),jobProperties,ptnDesc.getPartSpec()); - } - - public static void setTableDesc(TableDesc tableDesc, Map jobProperties, Map ptnValues) throws IOException { - Pair dbAndTableName = HCatUtil.getDbAndTableName(tableDesc.getTableName()); - - OutputJobInfo outputJobInfo = OutputJobInfo.create( - dbAndTableName.first, dbAndTableName.second, - ptnValues); - - Job job = new Job(new Configuration()); - // TODO : verify with thw if this needs to be shim-ed. There exists no current Shim - // for instantiating a Job, and we use it only temporarily. - - HCatOutputFormat.setOutput(job, outputJobInfo); - LOG.debug("HCatOutputFormat.setOutput() done"); - - // Now we need to set the schema we intend to write - - Properties tprops = tableDesc.getProperties(); - String columnNameProperty = tprops.getProperty(Constants.LIST_COLUMNS); - String columnTypeProperty = tprops.getProperty(Constants.LIST_COLUMN_TYPES); - - List columnNames; - // all table column names - if (columnNameProperty.length() == 0) { - columnNames = new ArrayList(); - } else { - columnNames = Arrays.asList(columnNameProperty.split(",")); - } - - List columnTypes; - // all column types - if (columnTypeProperty.length() == 0) { - columnTypes = new ArrayList(); - } else { - columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); - } - - StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); - HCatSchema hsch = HCatSchemaUtils.getHCatSchema(rowTypeInfo).getFields().get(0).getStructSubSchema(); - // getting inner schema, because it's the difference between struct and i:int,j:int. - // and that's what we need to provide to HCatOutputFormat - - LOG.debug("schema "+hsch.toString()); - HCatOutputFormat.setSchema(job, hsch); - - for (String confToSave : HCatConstants.OUTPUT_CONFS_TO_SAVE){ - String confVal = job.getConfiguration().get(confToSave); - if (confVal != null){ - jobProperties.put(confToSave, confVal); - } - } - - } - - public class HCatMapredRecordWriter implements org.apache.hadoop.mapred.RecordWriter, HCatRecord>{ - - org.apache.hadoop.mapreduce.RecordWriter writer; - org.apache.hadoop.mapreduce.OutputCommitter outputCommitter; - TaskAttemptContext taContext; - JobContext jc; - boolean jobIsSetup = false; - boolean wroteData = false; - boolean aborted = false; - - public HCatMapredRecordWriter( - HCatOutputFormat hco, JobContext jc, - TaskAttemptContext taContext) throws IOException{ - this.taContext = taContext; - try { - this.outputCommitter = hco.getOutputCommitter(taContext); - this.writer = hco.getRecordWriter(taContext); - } catch (java.lang.InterruptedException e){ - throw new IOException(e); - } - this.wroteData = false; - this.aborted = false; - } - - public void setAbortStatus(boolean abort) { - this.aborted = abort; - } - - @Override - public void close(Reporter arg0) throws IOException { - try { - writer.close(taContext); - if (outputCommitter.needsTaskCommit(taContext)){ - outputCommitter.commitTask(taContext); - } - if (this.wroteData && this.jobIsSetup){ - if (!this.aborted){ - outputCommitter.commitJob(taContext); - } else { - outputCommitter.cleanupJob(taContext); - } - } - } catch (Exception e){ - throw new IOException(e); - } - } - - @Override - public void write(WritableComparable arg0, HCatRecord arg1) throws IOException { - try { - if (!jobIsSetup){ - this.outputCommitter.setupJob(taContext); - jobIsSetup = true; - } - writer.write(arg0, arg1); - this.wroteData = true; - } catch (Exception e){ - throw new IOException(e); - } - } - - } -} Index: src/java/org/apache/hcatalog/mapred/HiveHCatSplitWrapper.java =================================================================== --- src/java/org/apache/hcatalog/mapred/HiveHCatSplitWrapper.java (revision 1300322) +++ src/java/org/apache/hcatalog/mapred/HiveHCatSplitWrapper.java (working copy) @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hcatalog.mapred; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hcatalog.common.HCatUtil; -import org.apache.hcatalog.mapreduce.HCatSplit; - -/** - * Even though HiveInputSplit expects an InputSplit to wrap, it - * expects getPath() to work from the underlying split. And since - * that's populated by HiveInputSplit only if the underlying - * split is a FileSplit, the HCatSplit that goes to Hive needs - * to be a FileSplit. And since FileSplit is a class, and - * mapreduce.InputSplit is also a class, we can't do the trick - * where we implement mapred.inputSplit and extend mapred.InputSplit. - * - * Thus, we compose the other HCatSplit, and work with it. - * - * Also, this means that reading HCat through Hive will only work - * when the underlying InputFormat's InputSplit has implemented - * a getPath() - either by subclassing FileSplit, or by itself - - * we make a best effort attempt to call a getPath() via reflection, - * but if that doesn't work, this isn't going to work. - * - */ -public class HiveHCatSplitWrapper extends FileSplit implements InputSplit { - - Log LOG = LogFactory.getLog(HiveHCatSplitWrapper.class); - - HCatSplit hsplit; - - public HiveHCatSplitWrapper() { - super((Path) null, 0, 0, (String[]) null); - } - - public HiveHCatSplitWrapper(HCatSplit hsplit) { - this(); - this.hsplit = hsplit; - } - - @Override - public void readFields(DataInput input) throws IOException { - hsplit = new HCatSplit(); - hsplit.readFields(input); - } - - @Override - public void write(DataOutput output) throws IOException { - hsplit.write(output); - } - - @Override - public long getLength() { - return hsplit.getLength(); - } - - @Override - public String[] getLocations() throws IOException { - return hsplit.getLocations(); - } - - @Override - public Path getPath() { - /** - * This function is the reason this class exists at all. - * See class description for why. - */ - if (hsplit.getBaseSplit() instanceof FileSplit){ - // if baseSplit is a FileSplit, then return that. - return ((FileSplit)hsplit.getBaseSplit()).getPath(); - } else { - // use reflection to try and determine if underlying class has a getPath() method that returns a path - Class c = hsplit.getBaseSplit().getClass(); - try { - return (Path) (c.getMethod("getPath")).invoke(hsplit.getBaseSplit()); - } catch (Exception e) { - HCatUtil.logStackTrace(LOG); - // not much we can do - default exit will return null Path - } - - } - LOG.error("Returning empty path from getPath(), Hive will not be happy."); - return new Path(""); // This will cause hive to error, but we can't do anything for that situation. - } - - public HCatSplit getHCatSplit() { - return hsplit; - } - -} Index: storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java =================================================================== --- storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java (revision 1300322) +++ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java (working copy) @@ -92,7 +92,7 @@ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(getHiveConf().getAllProperties())); Job job = new Job(conf); - inputInfo.getProperties().setProperty(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, "dummysnapshot"); + inputInfo.getJobProperties().put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, "dummysnapshot"); InitializeInput.setInput(job, inputInfo); String modifiedInputInfo = job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO); inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo); @@ -122,7 +122,7 @@ revMap.put("cf1", 3L); hbaseSnapshot = new TableSnapshot(fullyQualTableName, revMap, -1); inputInfo = InputJobInfo.create(databaseName, tableName, null); - inputInfo.getProperties().setProperty(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, "dummysnapshot"); + inputInfo.getJobProperties().put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, "dummysnapshot"); InitializeInput.setInput(job, inputInfo); modifiedInputInfo = job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO); inputInfo = (InputJobInfo) HCatUtil.deserialize(modifiedInputInfo); Index: storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java =================================================================== --- storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java (revision 1300322) +++ storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java (working copy) @@ -84,7 +84,7 @@ private transient HBaseAdmin admin; @Override - public void configureInputJobProperties(TableDesc tableDesc, Map jobProperties) { + public void configureInputJobProperties(TableDesc tableDesc) { // Populate jobProperties with input table name, table columns, RM snapshot, // hbase-default.xml and hbase-site.xml Map tableJobProperties = tableDesc.getJobProperties(); @@ -93,23 +93,23 @@ InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString); HCatTableInfo tableInfo = inputJobInfo.getTableInfo(); String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo); - jobProperties.put(TableInputFormat.INPUT_TABLE, qualifiedTableName); + tableJobProperties.put(TableInputFormat.INPUT_TABLE, qualifiedTableName); Configuration jobConf = getConf(); String outputSchema = jobConf.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); - jobProperties.put(TableInputFormat.SCAN_COLUMNS, getScanColumns(tableInfo, outputSchema)); + tableJobProperties.put(TableInputFormat.SCAN_COLUMNS, getScanColumns(tableInfo, outputSchema)); - String serSnapshot = (String) inputJobInfo.getProperties().get( + String serSnapshot = (String) inputJobInfo.getJobProperties().get( HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY); if (serSnapshot == null) { Configuration conf = addHbaseResources(jobConf); HCatTableSnapshot snapshot = HBaseRevisionManagerUtil.createSnapshot(conf, qualifiedTableName, tableInfo); - jobProperties.put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, + tableJobProperties.put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, HCatUtil.serialize(snapshot)); } - addHbaseResources(jobConf, jobProperties); + addHbaseResources(jobConf, tableJobProperties); } catch (IOException e) { throw new IllegalStateException("Error while configuring job properties", e);