Index: src/java/org/apache/hcatalog/pig/PigHCatUtil.java =================================================================== --- src/java/org/apache/hcatalog/pig/PigHCatUtil.java (revision 1232028) +++ src/java/org/apache/hcatalog/pig/PigHCatUtil.java (working copy) @@ -35,6 +35,7 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatArrayBag; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.Pair; @@ -68,12 +69,9 @@ // . - parse it and // communicate the information to HCatInputFormat - String[] dbTableNametokens = location.split("\\."); - if(dbTableNametokens.length == 1) { - return new Pair(DEFAULT_DB,location); - }else if (dbTableNametokens.length == 2) { - return new Pair(dbTableNametokens[0], dbTableNametokens[1]); - }else{ + try { + return HCatUtil.getDbAndTableName(location); + } catch (IOException e) { String locationErrMsg = "The input location in load statement " + "should be of the form " + ".
or
. Got " + location; Index: src/java/org/apache/hcatalog/common/HCatConstants.java =================================================================== --- src/java/org/apache/hcatalog/common/HCatConstants.java (revision 1232028) +++ src/java/org/apache/hcatalog/common/HCatConstants.java (working copy) @@ -75,6 +75,15 @@ public static final String HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE + ".jobclient.token.sig"; public static final String HCAT_KEY_JOBCLIENT_TOKEN_STRFORM = HCAT_KEY_OUTPUT_BASE + ".jobclient.token.strform"; + public static final String[] OUTPUT_CONFS_TO_SAVE = { + HCAT_KEY_OUTPUT_INFO, + HCAT_KEY_HIVE_CONF, + HCAT_KEY_TOKEN_SIGNATURE, + HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE, + HCAT_KEY_JOBCLIENT_TOKEN_STRFORM + }; + + public static final String HCAT_MSG_CLEAN_FREQ = "hcat.msg.clean.freq"; public static final String HCAT_MSG_EXPIRY_DURATION = "hcat.msg.expiry.duration"; Index: src/java/org/apache/hcatalog/common/HCatUtil.java =================================================================== --- src/java/org/apache/hcatalog/common/HCatUtil.java (revision 1232028) +++ src/java/org/apache/hcatalog/common/HCatUtil.java (working copy) @@ -26,6 +26,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -38,6 +39,7 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -55,6 +57,7 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hcatalog.data.Pair; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; @@ -418,12 +421,17 @@ public static void logEntrySet(Log logger, String itemName, Set entrySet) { - logger.info(itemName + ":"); - for (Entry e : entrySet) { - logger.info("\t[" + e.getKey() + "]=>[" + e.getValue() + "]"); - } + logIterableSet(logger,itemName,entrySet.iterator()); } + public static void logIterableSet(Log logger, String itemName, Iterator iterator){ + logger.info(itemName + ":"); + while (iterator.hasNext()){ + Entry e = iterator.next(); + logger.debug("\t[" + e.getKey() + "]=>[" + e.getValue() + "]"); + } + } + public static void logAllTokens(Log logger, JobContext context) throws IOException { for (Token t : context.getCredentials() @@ -459,4 +467,15 @@ } } + public static Pair getDbAndTableName(String tableName) throws IOException{ + String[] dbTableNametokens = tableName.split("\\."); + if(dbTableNametokens.length == 1) { + return new Pair(MetaStoreUtils.DEFAULT_DATABASE_NAME,tableName); + }else if (dbTableNametokens.length == 2) { + return new Pair(dbTableNametokens[0], dbTableNametokens[1]); + }else{ + throw new IOException("tableName expected in the form " + +".
or
. Got " + tableName); + } + } } Index: src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java (revision 1232028) +++ src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java (working copy) @@ -19,17 +19,24 @@ import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hcatalog.data.DefaultHCatRecord; import org.apache.hcatalog.data.HCatRecord; /** The HCat wrapper for the underlying RecordReader, this ensures that the initialize on * the underlying record reader is done with the underlying split, not with HCatSplit. */ -class HCatRecordReader extends RecordReader { +class HCatRecordReader extends RecordReader + implements org.apache.hadoop.mapred.RecordReader { + + Log LOG = LogFactory.getLog(HCatRecordReader.class); + int lineCount = 0; /** The underlying record reader to delegate to. */ private final RecordReader baseRecordReader; @@ -74,15 +81,25 @@ */ @Override public HCatRecord getCurrentValue() throws IOException, InterruptedException { - return storageDriver.convertToHCatRecord(baseRecordReader.getCurrentKey(),baseRecordReader.getCurrentValue()); + HCatRecord r = storageDriver.convertToHCatRecord(baseRecordReader.getCurrentKey(),baseRecordReader.getCurrentValue()); + return r; } /* (non-Javadoc) * @see org.apache.hadoop.mapreduce.RecordReader#getProgress() */ @Override - public float getProgress() throws IOException, InterruptedException { - return baseRecordReader.getProgress(); + public float getProgress() { + try { + return baseRecordReader.getProgress(); + } catch (IOException e) { + LOG.warn(e.getMessage()); + LOG.warn(e.getStackTrace()); + } catch (InterruptedException e) { + LOG.warn(e.getMessage()); + LOG.warn(e.getStackTrace()); + } + return 0.0f; // errored } /* (non-Javadoc) @@ -90,6 +107,7 @@ */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { + lineCount++; return baseRecordReader.nextKeyValue(); } @@ -100,4 +118,46 @@ public void close() throws IOException { baseRecordReader.close(); } + + @Override + public Object createKey() { + WritableComparable o = null; + try { + o = getCurrentKey(); + } catch (IOException e) { + LOG.warn(e.getMessage()); + LOG.warn(e.getStackTrace()); + } catch (InterruptedException e) { + LOG.warn(e.getMessage()); + LOG.warn(e.getStackTrace()); + } + return o; + } + + @Override + public Object createValue() { + return new DefaultHCatRecord(); + } + + @Override + public long getPos() throws IOException { + return lineCount; + } + + @Override + public boolean next(Object key, Object value) throws IOException { + try { + if (!nextKeyValue()){ + return false; + } + + ((HCatRecord)value).copy(getCurrentValue()); + + return true; + } catch (InterruptedException e) { + LOG.warn(e.getMessage()); + LOG.warn(e.getStackTrace()); + } + return false; + } } Index: src/java/org/apache/hcatalog/mapreduce/InitializeInput.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (revision 1232028) +++ src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (working copy) @@ -24,6 +24,8 @@ import java.util.Map; import java.util.Properties; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -46,6 +48,8 @@ * info required in the client process context. */ public class InitializeInput { + + private static final Log LOG = LogFactory.getLog(InitializeInput.class); /** The prefix for keys used for storage driver arguments */ static final String HCAT_KEY_PREFIX = "hcat."; @@ -69,10 +73,23 @@ //* Create and initialize an InputJobInfo object //* Serialize the InputJobInfo and save in the Job's Configuration object + job.getConfiguration().set( + HCatConstants.HCAT_KEY_JOB_INFO, + getSerializedHcatKeyJobInfo(job, inputJobInfo,null)); + } + + public static String getSerializedHcatKeyJobInfo(Job job, InputJobInfo inputJobInfo, String locationFilter) throws Exception { + //* Create and initialize an InputJobInfo object + HiveMetaStoreClient client = null; try { - client = createHiveMetaClient(job.getConfiguration(),inputJobInfo); + if (job != null){ + client = createHiveMetaClient(job.getConfiguration(),inputJobInfo); + } else { + hiveConf = new HiveConf(HCatInputFormat.class); + client = new HiveMetaStoreClient(hiveConf, null); + } Table table = client.getTable(inputJobInfo.getDatabaseName(), inputJobInfo.getTableName()); @@ -107,17 +124,15 @@ inputJobInfo.setPartitions(partInfoList); inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table)); - job.getConfiguration().set( - HCatConstants.HCAT_KEY_JOB_INFO, - HCatUtil.serialize(inputJobInfo) - ); + return HCatUtil.serialize(inputJobInfo); } finally { if (client != null ) { client.close(); } } + } - + private static Map createPtnKeyValueMap(Table table, Partition ptn) throws IOException{ List values = ptn.getValues(); if( values.size() != table.getPartitionKeys().size() ) { Index: src/java/org/apache/hcatalog/mapreduce/HCatSplit.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatSplit.java (revision 1232028) +++ src/java/org/apache/hcatalog/mapreduce/HCatSplit.java (working copy) @@ -22,6 +22,8 @@ import java.io.IOException; import java.lang.reflect.Constructor; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.InputSplit; @@ -29,8 +31,10 @@ import org.apache.hcatalog.data.schema.HCatSchema; /** The HCatSplit wrapper around the InputSplit returned by the underlying InputFormat */ -class HCatSplit extends InputSplit implements Writable { +public class HCatSplit extends InputSplit implements Writable,org.apache.hadoop.mapred.InputSplit { + Log LOG = LogFactory.getLog(HCatSplit.class); + /** The partition info for the split. */ private PartInfo partitionInfo; @@ -94,16 +98,34 @@ * @see org.apache.hadoop.mapreduce.InputSplit#getLength() */ @Override - public long getLength() throws IOException, InterruptedException { - return baseSplit.getLength(); + public long getLength() { + try { + return baseSplit.getLength(); + } catch (IOException e) { + LOG.warn(e.getMessage()); + LOG.warn(e.getStackTrace()); + } catch (InterruptedException e) { + LOG.warn(e.getMessage()); + LOG.warn(e.getStackTrace()); + } + return 0; // we errored } /* (non-Javadoc) * @see org.apache.hadoop.mapreduce.InputSplit#getLocations() */ @Override - public String[] getLocations() throws IOException, InterruptedException { - return baseSplit.getLocations(); + public String[] getLocations() { + try { + return baseSplit.getLocations(); + } catch (IOException e) { + LOG.warn(e.getMessage()); + LOG.warn(e.getStackTrace()); + } catch (InterruptedException e) { + LOG.warn(e.getMessage()); + LOG.warn(e.getStackTrace()); + } + return new String[0]; // we errored } /* (non-Javadoc)