diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index dffdb5c..e7ed07e 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1571,6 +1571,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", "5000ms", new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"), + COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" + + "Compaction jobs will be submitted. Set to empty string to let Hadoop choose the queue."), HIVE_TIMEDOUT_TXN_REAPER_START("hive.timedout.txn.reaper.start", "100s", new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"), HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s", diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 02fa725..3ee9346 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -117,6 +117,11 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, job.setInputFormat(CompactorInputFormat.class); job.setOutputFormat(NullOutputFormat.class); job.setOutputCommitter(CompactorOutputCommitter.class); + + String queueName = conf.getVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE); + if(queueName != null && queueName.length() > 0) { + job.setQueueName(queueName); + } job.set(FINAL_LOCATION, sd.getLocation()); job.set(TMP_LOCATION, sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString()); @@ -189,7 +194,10 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, LOG.debug("Setting maximume transaction to " + maxTxn); RunningJob rj = JobClient.runJob(job); - LOG.info("Submitted " + (isMajor ? CompactionType.MAJOR : CompactionType.MINOR) + " compaction job '" + jobName + "' with jobID=" + rj.getID()); + LOG.info("Submitted " + (isMajor ? CompactionType.MAJOR : CompactionType.MINOR) + " compaction job '" + + jobName + "' with jobID=" + rj.getID() + " to " + job.getQueueName() + " queue. " + + "(current delta dirs count=" + dir.getCurrentDirectories().size() + + ", obsolete delta dirs count=" + dir.getObsolete()); rj.waitForCompletion(); su.gatherStats(); }