Index: src/java/org/apache/hcatalog/pig/PigHCatUtil.java =================================================================== --- src/java/org/apache/hcatalog/pig/PigHCatUtil.java (revision 1311907) +++ src/java/org/apache/hcatalog/pig/PigHCatUtil.java (working copy) @@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -91,13 +90,8 @@ return job.getConfiguration().get(HCatConstants.HCAT_METASTORE_PRINCIPAL); } - static HiveMetaStoreClient client = null; - private static HiveMetaStoreClient createHiveMetaClient(String serverUri, String serverKerberosPrincipal, Class clazz) throws Exception { - if (client != null){ - return client; - } HiveConf hiveConf = new HiveConf(clazz); if (serverUri != null){ @@ -111,11 +105,10 @@ } try { - client = new HiveMetaStoreClient(hiveConf,null); + return new HiveMetaStoreClient(hiveConf,null); } catch (Exception e){ throw new Exception("Could not instantiate a HiveMetaStoreClient connecting to server uri:["+serverUri+"]",e); } - return client; } @@ -146,6 +139,7 @@ String dbName = dbTablePair.first; String tableName = dbTablePair.second; Table table = null; + HiveMetaStoreClient client = null; try { client = createHiveMetaClient(hcatServerUri, hcatServerPrincipal, PigHCatUtil.class); table = client.getTable(dbName, tableName); @@ -153,6 +147,8 @@ throw new PigException("Table not found : " + nsoe.getMessage(), PIG_EXCEPTION_CODE); // prettier error messages to frontend } catch (Exception e) { throw new IOException(e); + } finally { + HCatUtil.closeHiveClientQuietly(client); } hcatTableCache.put(loc_server, table); return table; Index: src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java (revision 1311907) +++ src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java (working copy) @@ -113,11 +113,13 @@ @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); + HiveMetaStoreClient client = null; try { HiveConf hiveConf = HCatUtil.getHiveConf(context.getConfiguration()); + client = HCatUtil.createHiveClient(hiveConf); handleDuplicatePublish(context, jobInfo, - HCatUtil.createHiveClient(hiveConf), + client, jobInfo.getTableInfo().getTable()); } catch (MetaException e) { throw new IOException(e); @@ -125,6 +127,8 @@ throw new IOException(e); } catch (NoSuchObjectException e) { throw new IOException(e); + } finally { + HCatUtil.closeHiveClientQuietly(client); } if(!jobInfo.isDynamicPartitioningUsed()) { Index: src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java (revision 1311907) +++ src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java (working copy) @@ -90,15 +90,18 @@ getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context)); //Cancel HCat and JobTracker tokens + HiveMetaStoreClient client = null; try { HiveConf hiveConf = HCatUtil.getHiveConf(context.getConfiguration()); - HiveMetaStoreClient client = HCatUtil.createHiveClient(hiveConf); + client = HCatUtil.createHiveClient(hiveConf); String tokenStrForm = client.getTokenStrForm(); if(tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { client.cancelDelegationToken(tokenStrForm); } } catch (Exception e) { LOG.warn("Failed to cancel delegation token", e); + } finally { + HCatUtil.closeHiveClientQuietly(client); } } } Index: src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (revision 1311907) +++ src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (working copy) @@ -224,22 +224,27 @@ static void cancelDelegationTokens(JobContext context, OutputJobInfo outputJobInfo) throws Exception { HiveConf hiveConf = HCatUtil.getHiveConf(context.getConfiguration()); - HiveMetaStoreClient client = HCatUtil.createHiveClient(hiveConf); + HiveMetaStoreClient client = null; // cancel the deleg. tokens that were acquired for this job now that // we are done - we should cancel if the tokens were acquired by // HCatOutputFormat and not if they were supplied by Oozie. In the latter // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set - String tokenStrForm = client.getTokenStrForm(); - if(tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { - client.cancelDelegationToken(tokenStrForm); - } + try { + client = HCatUtil.createHiveClient(hiveConf); + String tokenStrForm = client.getTokenStrForm(); + if(tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { + client.cancelDelegationToken(tokenStrForm); + } - String jcTokenStrForm = - context.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM); - String jcTokenSignature = - context.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE); - if(jcTokenStrForm != null && jcTokenSignature != null) { - HCatUtil.cancelJobTrackerDelegationToken(tokenStrForm,jcTokenSignature); + String jcTokenStrForm = + context.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM); + String jcTokenSignature = + context.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE); + if(jcTokenStrForm != null && jcTokenSignature != null) { + HCatUtil.cancelJobTrackerDelegationToken(tokenStrForm,jcTokenSignature); + } + } finally { + HCatUtil.closeHiveClientQuietly(client); } } } Index: src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (revision 1311907) +++ src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (working copy) @@ -70,12 +70,11 @@ @SuppressWarnings("unchecked") public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException { HiveMetaStoreClient client = null; - HiveConf hiveConf = null; try { Configuration conf = job.getConfiguration(); - hiveConf = HCatUtil.getHiveConf(conf); + HiveConf hiveConf = HCatUtil.getHiveConf(conf); client = HCatUtil.createHiveClient(hiveConf); Table table = client.getTable(outputJobInfo.getDatabaseName(), outputJobInfo.getTableName()); @@ -199,9 +198,7 @@ throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); } } finally { - if( client != null ) { - client.close(); - } + HCatUtil.closeHiveClientQuietly(client); // HCatUtil.logAllTokens(LOG,job); } } Index: src/java/org/apache/hcatalog/mapreduce/InitializeInput.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (revision 1311907) +++ src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (working copy) @@ -47,17 +47,9 @@ * info required in the client process context. */ public class InitializeInput { - + private static final Log LOG = LogFactory.getLog(InitializeInput.class); - private static HiveConf hiveConf; - - private static HiveMetaStoreClient createHiveMetaClient(Configuration conf) throws Exception { - - hiveConf = HCatUtil.getHiveConf(conf); - return new HiveMetaStoreClient(hiveConf, null); - } - /** * Set the input to use for the Job. This queries the metadata server with the specified partition predicates, * gets the matching partitions, puts the information in the configuration object. @@ -71,7 +63,7 @@ //* Serialize the InputJobInfo and save in the Job's Configuration object job.getConfiguration().set( - HCatConstants.HCAT_KEY_JOB_INFO, + HCatConstants.HCAT_KEY_JOB_INFO, getSerializedHcatKeyJobInfo(job, inputJobInfo,null)); } @@ -79,14 +71,14 @@ //* Create and initialize an InputJobInfo object HiveMetaStoreClient client = null; - + HiveConf hiveConf = null; try { if (job != null){ - client = createHiveMetaClient(job.getConfiguration()); + hiveConf = HCatUtil.getHiveConf(job.getConfiguration()); } else { hiveConf = new HiveConf(HCatInputFormat.class); - client = new HiveMetaStoreClient(hiveConf, null); } + client = HCatUtil.createHiveClient(hiveConf); Table table = client.getTable(inputJobInfo.getDatabaseName(), inputJobInfo.getTableName()); @@ -108,8 +100,8 @@ // populate partition info for (Partition ptn : parts){ - PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters(), - job.getConfiguration(), + PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters(), + job.getConfiguration(), inputJobInfo); partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn)); partInfoList.add(partInfo); @@ -118,7 +110,7 @@ }else{ //Non partitioned table PartInfo partInfo = extractPartInfo(table.getSd(),table.getParameters(), - job.getConfiguration(), + job.getConfiguration(), inputJobInfo); partInfo.setPartitionValues(new HashMap()); partInfoList.add(partInfo); @@ -127,13 +119,11 @@ return HCatUtil.serialize(inputJobInfo); } finally { - if (client != null ) { - client.close(); - } + HCatUtil.closeHiveClientQuietly(client); } } - + private static Map createPtnKeyValueMap(Table table, Partition ptn) throws IOException{ List values = ptn.getValues(); if( values.size() != table.getPartitionKeys().size() ) { @@ -155,25 +145,25 @@ return ptnKeyValues; } - static PartInfo extractPartInfo(StorageDescriptor sd, - Map parameters, Configuration conf, + static PartInfo extractPartInfo(StorageDescriptor sd, + Map parameters, Configuration conf, InputJobInfo inputJobInfo) throws IOException{ HCatSchema schema = HCatUtil.extractSchemaFromStorageDescriptor(sd); StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd,parameters); Properties hcatProperties = new Properties(); - HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf, + HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf, storerInfo); // copy the properties from storageHandler to jobProperties MapjobProperties = HCatUtil.getInputJobProperties( - storageHandler, + storageHandler, inputJobInfo); for (String key : parameters.keySet()){ hcatProperties.put(key, parameters.get(key)); } - // FIXME + // FIXME // Bloating partinfo with inputJobInfo is not good return new PartInfo(schema, storageHandler, sd.getLocation(), hcatProperties, Index: src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision 1311907) +++ src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (working copy) @@ -172,15 +172,14 @@ } } - OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); - + HiveMetaStoreClient client = null; try { HiveConf hiveConf = HCatUtil.getHiveConf(jobContext.getConfiguration()); - HiveMetaStoreClient client = HCatUtil.createHiveClient(hiveConf); + client = HCatUtil.createHiveClient(hiveConf); // cancel the deleg. tokens that were acquired for this job now that // we are done - we should cancel if the tokens were acquired by // HCatOutputFormat and not if they were supplied by Oozie. - // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in + // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in // the conf will not be set String tokenStrForm = client.getTokenStrForm(); if(tokenStrForm != null && jobContext.getConfiguration().get @@ -202,9 +201,12 @@ } else { throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e); } + } finally { + HCatUtil.closeHiveClientQuietly(client); } Path src; + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); if (dynamicPartitioningUsed){ src = new Path(getPartitionRootLocation( jobInfo.getLocation().toString(),jobInfo.getTableInfo().getTable().getPartitionKeysSize() @@ -426,9 +428,7 @@ throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e); } } finally { - if( client != null ) { - client.close(); - } + HCatUtil.closeHiveClientQuietly(client); } } Index: src/java/org/apache/hcatalog/common/HCatUtil.java =================================================================== --- src/java/org/apache/hcatalog/common/HCatUtil.java (revision 1311907) +++ src/java/org/apache/hcatalog/common/HCatUtil.java (working copy) @@ -617,11 +617,19 @@ } } - //TODO remove url component, everything should be encapsulated in HiveConf - public static HiveMetaStoreClient createHiveClient(HiveConf hiveConf) - throws MetaException { - return new HiveMetaStoreClient(hiveConf); + public static HiveMetaStoreClient createHiveClient(HiveConf hiveConf) + throws MetaException { + return new HiveMetaStoreClient(hiveConf); } + + public static void closeHiveClientQuietly(HiveMetaStoreClient client) { + try { + if (client != null) + client.close(); + } catch (Exception e) { + LOG.debug("Error closing metastore client", e); + } + } public static HiveConf getHiveConf(Configuration conf)