diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java index 8b927af..fcf110c 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java @@ -333,13 +333,19 @@ public static boolean validateExecuteBitPresentIfReadOrWrite(FsAction perms) { public static Token getJobTrackerDelegationToken( Configuration conf, String userName) throws Exception { - // LOG.info("getJobTrackerDelegationToken("+conf+","+userName+")"); - JobClient jcl = new JobClient(new JobConf(conf, HCatOutputFormat.class)); - Token t = jcl - .getDelegationToken(new Text(userName)); - // LOG.info("got "+t); - return t; - + JobClient jcl = null; + try { + // LOG.info("getJobTrackerDelegationToken("+conf+","+userName+")"); + jcl = new JobClient(new JobConf(conf, HCatOutputFormat.class)); + Token t = jcl + .getDelegationToken(new Text(userName)); + // LOG.info("got "+t); + return t; + } finally { + if (jcl != null) { + jcl.close(); + } + } // return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java index da99c23..b1afa17 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java @@ -85,7 +85,7 @@ public int execute(DriverContext driverContext) { boolean ctxCreated = false; RunningJob rj = null; int returnVal = 0; - + JobClient jc = null; try { if (ctx == null) { ctx = new Context(job); @@ -141,7 +141,7 @@ public int execute(DriverContext driverContext) { } // submit the job - JobClient jc = new JobClient(job); + jc = new JobClient(job); String addedJars = Utilities.getResourceFiles(job, SessionState.ResourceType.JAR); @@ -193,6 +193,9 @@ public int execute(DriverContext driverContext) { } } } + if (jc != null) { + jc.close(); + } } catch (Exception e) { // jobClose needs to execute successfully otherwise fail task LOG.warn("Job close failed ",e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java index ad921f3..6eee717 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java @@ -107,7 +107,7 @@ public int execute(DriverContext driverContext) { HiveFileFormatUtils.prepareJobOutput(job); job.setOutputFormat(HiveOutputFormatImpl.class); job.setMapperClass(work.getMapperClass()); - + JobClient jc = null; Context ctx = driverContext.getCtx(); boolean ctxCreated = false; try { @@ -189,7 +189,7 @@ public int execute(DriverContext driverContext) { if (pwd != null) { HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, "HIVE"); } - JobClient jc = new JobClient(job); + jc = new JobClient(job); String addedJars = Utilities.getResourceFiles(job, SessionState.ResourceType.JAR); if (!addedJars.isEmpty()) { @@ -249,6 +249,9 @@ public int execute(DriverContext driverContext) { rj.killJob(); } } + if (jc != null) { + jc.close(); + } } catch (Exception e) { LOG.warn("Failed in cleaning up ", e); } finally { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java index 8e89b71..e825a6d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java @@ -85,7 +85,7 @@ public int execute(DriverContext driverContext) { HiveFileFormatUtils.prepareJobOutput(job); job.setOutputFormat(HiveOutputFormatImpl.class); job.setMapperClass(work.getMapperClass()); - + JobClient jc = null; Context ctx = driverContext.getCtx(); boolean ctxCreated = false; try { @@ -174,7 +174,8 @@ public int execute(DriverContext driverContext) { if (pwd != null) { HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, "HIVE"); } - JobClient jc = new JobClient(job); + + jc = new JobClient(job); String addedJars = Utilities.getResourceFiles(job, SessionState.ResourceType.JAR); if (!addedJars.isEmpty()) { @@ -219,6 +220,9 @@ public int execute(DriverContext driverContext) { } ColumnTruncateMapper.jobClose(outputPath, success, job, console, work.getDynPartCtx(), null); + if (jc != null) { + jc.close(); + } } catch (Exception e) { LOG.warn("Failed while cleaning up ", e); } finally { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index f83b6db..8b9633f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -280,42 +280,50 @@ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compa List parsedDeltas, int curDirNumber, int obsoleteDirNumber, HiveConf hiveConf, TxnStore txnHandler, long id, String jobName) throws IOException { - job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR); - if(dirsToSearch == null) { - dirsToSearch = new StringableList(); - } - StringableList deltaDirs = new StringableList(); - long minTxn = Long.MAX_VALUE; - long maxTxn = Long.MIN_VALUE; - for (AcidUtils.ParsedDelta delta : parsedDeltas) { - LOG.debug("Adding delta " + delta.getPath() + " to directories to search"); - dirsToSearch.add(delta.getPath()); - deltaDirs.add(delta.getPath()); - minTxn = Math.min(minTxn, delta.getMinTransaction()); - maxTxn = Math.max(maxTxn, delta.getMaxTransaction()); - } - - if (baseDir != null) job.set(BASE_DIR, baseDir.toString()); - job.set(DELTA_DIRS, deltaDirs.toString()); - job.set(DIRS_TO_SEARCH, dirsToSearch.toString()); - job.setLong(MIN_TXN, minTxn); - job.setLong(MAX_TXN, maxTxn); - - if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { - mrJob = job; - } - - LOG.info("Submitting " + compactionType + " compaction job '" + - job.getJobName() + "' to " + job.getQueueName() + " queue. " + - "(current delta dirs count=" + curDirNumber + - ", obsolete delta dirs count=" + obsoleteDirNumber + ". TxnIdRange[" + minTxn + "," + maxTxn + "]"); - RunningJob rj = new JobClient(job).submitJob(job); - LOG.info("Submitted compaction job '" + job.getJobName() + "' with jobID=" + rj.getID() + " compaction ID=" + id); - txnHandler.setHadoopJobId(rj.getID().toString(), id); - rj.waitForCompletion(); - if (!rj.isSuccessful()) { - throw new IOException(compactionType == CompactionType.MAJOR ? "Major" : "Minor" + - " compactor job failed for " + jobName + "! Hadoop JobId: " + rj.getID() ); + JobClient jc = null; + try { + job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR); + if(dirsToSearch == null) { + dirsToSearch = new StringableList(); + } + StringableList deltaDirs = new StringableList(); + long minTxn = Long.MAX_VALUE; + long maxTxn = Long.MIN_VALUE; + for (AcidUtils.ParsedDelta delta : parsedDeltas) { + LOG.debug("Adding delta " + delta.getPath() + " to directories to search"); + dirsToSearch.add(delta.getPath()); + deltaDirs.add(delta.getPath()); + minTxn = Math.min(minTxn, delta.getMinTransaction()); + maxTxn = Math.max(maxTxn, delta.getMaxTransaction()); + } + + if (baseDir != null) job.set(BASE_DIR, baseDir.toString()); + job.set(DELTA_DIRS, deltaDirs.toString()); + job.set(DIRS_TO_SEARCH, dirsToSearch.toString()); + job.setLong(MIN_TXN, minTxn); + job.setLong(MAX_TXN, maxTxn); + + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { + mrJob = job; + } + + LOG.info("Submitting " + compactionType + " compaction job '" + + job.getJobName() + "' to " + job.getQueueName() + " queue. " + + "(current delta dirs count=" + curDirNumber + + ", obsolete delta dirs count=" + obsoleteDirNumber + ". TxnIdRange[" + minTxn + "," + maxTxn + "]"); + jc = new JobClient(job); + RunningJob rj = jc.submitJob(job); + LOG.info("Submitted compaction job '" + job.getJobName() + "' with jobID=" + rj.getID() + " compaction ID=" + id); + txnHandler.setHadoopJobId(rj.getID().toString(), id); + rj.waitForCompletion(); + if (!rj.isSuccessful()) { + throw new IOException(compactionType == CompactionType.MAJOR ? "Major" : "Minor" + + " compactor job failed for " + jobName + "! Hadoop JobId: " + rj.getID() ); + } + } finally { + if (jc != null) { + jc.close(); + } } } /**