diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 5dfa7ca974..93f4bc2561 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.common.ValidTxnList;
@@ -28,9 +27,6 @@
import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -44,7 +40,6 @@
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.Ref;
import java.io.IOException;
@@ -52,6 +47,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -61,9 +57,8 @@
* A class to clean directories after compactions. This will run in a separate thread.
*/
public class Cleaner extends MetaStoreCompactorThread {
- static final private String CLASS_NAME = Cleaner.class.getName();
- static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
- private long cleanerCheckInterval = 0;
+
+ private long cleanerCheckInterval = 0L;
private ReplChangeManager replChangeManager;
@@ -75,7 +70,7 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
@Override
public void run() {
- if (cleanerCheckInterval == 0) {
+ if (cleanerCheckInterval == 0L) {
cleanerCheckInterval = conf.getTimeVar(
HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
}
@@ -86,7 +81,7 @@ public void run() {
// first to make sure we go through a complete iteration of the loop before resetting it.
boolean setLooped = !looped.get();
TxnStore.MutexAPI.LockHandle handle = null;
- long startedAt = -1;
+ long startedAt = -1L;
// Make sure nothing escapes this run method and kills the metastore at large,
// so wrap it in a big catch Throwable statement.
try {
@@ -97,8 +92,7 @@ public void run() {
clean(compactionInfo, minOpenTxnId);
}
} catch (Throwable t) {
- LOG.error("Caught an exception in the main loop of compactor cleaner, " +
- StringUtils.stringifyException(t));
+ LOG.error("Caught an exception in the main loop of compactor cleaner", t);
}
finally {
if (handle != null) {
@@ -125,18 +119,17 @@ public void run() {
private void clean(CompactionInfo ci, long minOpenTxnGLB) throws MetaException {
LOG.info("Starting cleaning for " + ci);
try {
- Table t = resolveTable(ci);
- if (t == null) {
+ Optional
t = resolveTable(ci);
+ if (!t.isPresent()) {
// The table was dropped before we got around to cleaning it.
LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped." +
idWatermark(ci));
txnHandler.markCleaned(ci);
return;
}
- Partition p = null;
- if (ci.partName != null) {
- p = resolvePartition(ci);
- if (p == null) {
+ Optional p = resolvePartition(ci);
+ if (!p.isPresent()) {
+ if (ci.partName != null) {
// The partition was dropped before we got around to cleaning it.
LOG.info("Unable to find partition " + ci.getFullPartitionName() +
", assuming it was dropped." + idWatermark(ci));
@@ -144,7 +137,7 @@ private void clean(CompactionInfo ci, long minOpenTxnGLB) throws MetaException {
return;
}
}
- StorageDescriptor sd = resolveStorageDescriptor(t, p);
+ StorageDescriptor sd = resolveStorageDescriptor(t.get(), p);
final String location = sd.getLocation();
ValidTxnList validTxnList =
TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxnGLB);
@@ -181,7 +174,7 @@ private void clean(CompactionInfo ci, long minOpenTxnGLB) throws MetaException {
* useful if there is all of a sudden a flood of aborted txns. (For another day).
*/
List tblNames = Collections.singletonList(
- TableName.getDbTable(t.getDbName(), t.getTableName()));
+ TableName.getDbTable(t.get().getDbName(), t.get().getTableName()));
GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tblNames);
rqst.setValidTxnList(validTxnList.writeToString());
GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(rqst);
@@ -215,8 +208,7 @@ public Object run() throws Exception {
}
txnHandler.markCleaned(ci);
} catch (Exception e) {
- LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci + " " +
- StringUtils.stringifyException(e));
+ LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci, e);
txnHandler.markFailed(ci);
}
}
@@ -251,9 +243,8 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti
extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']');
LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() +
" obsolete directories from " + location + ". " + extraDebugInfo.toString());
- if (filesToDelete.size() < 1) {
- LOG.warn("Hmm, nothing to delete in the cleaner for directory " + location +
- ", that hardly seems right.");
+ if (filesToDelete.isEmpty()) {
+ LOG.warn("Nothing to delete in the cleaner for directory: " + location);
return;
}
@@ -262,7 +253,7 @@ private void removeFiles(String location, ValidWriteIdList writeIdList, Compacti
Boolean isSourceOfRepl = ReplChangeManager.isSourceOfReplication(db);
for (Path dead : filesToDelete) {
- LOG.debug("Going to delete path " + dead.toString());
+ LOG.debug("Going to delete path {}", dead);
if (isSourceOfRepl) {
replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionException.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionException.java
new file mode 100644
index 0000000000..54f4626d1b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+/**
+ * Signals that an exception of some sort has occurred during compaction.
+ */
+public class CompactionException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public CompactionException(String message) {
+ super(message);
+ }
+
+ public CompactionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index ee2c0f3e23..df784b62c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -78,7 +78,6 @@
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.Ref;
import org.apache.parquet.Strings;
import org.apache.thrift.TException;
@@ -287,14 +286,14 @@ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor
Path path = stat.getFileStatus().getPath();
//note that originalFiles are all original files recursively not dirs
dirsToSearch.add(path);
- LOG.debug("Adding original file " + path + " to dirs to search");
+ LOG.debug("Adding original file {} to dirs to search", path);
}
// Set base to the location so that the input format reads the original files.
baseDir = new Path(sd.getLocation());
}
} else {
// add our base to the list of directories to search for files in.
- LOG.debug("Adding base directory " + baseDir + " to dirs to search");
+ LOG.debug("Adding base directory {} to dirs to search", baseDir);
dirsToSearch.add(baseDir);
}
}
@@ -363,7 +362,7 @@ private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compa
long minTxn = Long.MAX_VALUE;
long maxTxn = Long.MIN_VALUE;
for (AcidUtils.ParsedDelta delta : parsedDeltas) {
- LOG.debug("Adding delta " + delta.getPath() + " to directories to search");
+ LOG.debug("Adding delta {} to directories to search", delta.getPath());
dirsToSearch.add(delta.getPath());
deltaDirs.add(delta.getPath());
minTxn = Math.min(minTxn, delta.getMinWriteId());
@@ -441,21 +440,6 @@ private void setColumnTypes(JobConf job, List cols) {
HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
}
- // Remove the directories for aborted transactions only
- private void removeFilesForMmTable(HiveConf conf, Directory dir) throws IOException {
- // For MM table, we only want to delete delta dirs for aborted txns.
- List filesToDelete = dir.getAbortedDirectories();
- if (filesToDelete.size() < 1) {
- return;
- }
- LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir);
- FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
- for (Path dead : filesToDelete) {
- LOG.debug("Going to delete path " + dead.toString());
- fs.delete(dead, true);
- }
- }
-
public JobConf getMrJob() {
return mrJob;
}
@@ -510,7 +494,7 @@ public long getLength() throws IOException {
@Override
public String[] getLocations() throws IOException {
- return locations.toArray(new String[locations.size()]);
+ return locations.toArray(new String[0]);
}
@Override
@@ -543,20 +527,20 @@ public void readFields(DataInput dataInput) throws IOException {
locations = new ArrayList();
length = dataInput.readLong();
- LOG.debug("Read length of " + length);
+ LOG.debug("Read length of {}", length);
int numElements = dataInput.readInt();
- LOG.debug("Read numElements of " + numElements);
+ LOG.debug("Read numElements of {}", numElements);
for (int i = 0; i < numElements; i++) {
len = dataInput.readInt();
- LOG.debug("Read file length of " + len);
+ LOG.debug("Read file length of {}", len);
buf = new byte[len];
dataInput.readFully(buf);
locations.add(new String(buf));
}
bucketNum = dataInput.readInt();
- LOG.debug("Read bucket number of " + bucketNum);
+ LOG.debug("Read bucket number of {}", bucketNum);
len = dataInput.readInt();
- LOG.debug("Read base path length of " + len);
+ LOG.debug("Read base path length of {}", len);
if (len > 0) {
buf = new byte[len];
dataInput.readFully(buf);
@@ -667,8 +651,8 @@ public String toString() {
bt.sawBase ? baseDir : null, deltaDirs));
}
- LOG.debug("Returning " + splits.size() + " splits");
- return splits.toArray(new InputSplit[splits.size()]);
+ LOG.debug("Returning {} splits", splits.size());
+ return splits.toArray(new InputSplit[0]);
}
@Override
@@ -693,7 +677,7 @@ private void addFileToMap(Matcher matcher, Path file, boolean sawBase,
bt = new BucketTracker();
splitToBucketMap.put(bucketNum, bt);
}
- LOG.debug("Adding " + file.toString() + " to list of files for splits");
+ LOG.debug("Adding {} to list of files for splits", file);
bt.buckets.add(file);
bt.sawBase |= sawBase;
}
@@ -925,15 +909,9 @@ public String toString() {
LOG.error(s);
throw new IOException(s);
}
- } catch (ClassNotFoundException e) {
- LOG.error("Unable to instantiate class, " + StringUtils.stringifyException(e));
- throw new IOException(e);
- } catch (InstantiationException e) {
- LOG.error("Unable to instantiate class, " + StringUtils.stringifyException(e));
- throw new IOException(e);
- } catch (IllegalAccessException e) {
- LOG.error("Unable to instantiate class, " + StringUtils.stringifyException(e));
- throw new IOException(e);
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ LOG.error("Unable to instantiate class", e);
+ throw new IOException("Unable to instantiate class", e);
}
return t;
}
@@ -971,8 +949,7 @@ public void commitJob(JobContext context) throws IOException {
Path tmpLocation = new Path(conf.get(TMP_LOCATION));//this contains base_xxx or delta_xxx_yyy
Path finalLocation = new Path(conf.get(FINAL_LOCATION));
FileSystem fs = tmpLocation.getFileSystem(conf);
- LOG.debug("Moving contents of " + tmpLocation.toString() + " to " +
- finalLocation.toString());
+ LOG.debug("Moving contents of {} to {}", tmpLocation, finalLocation);
if(!fs.exists(tmpLocation)) {
/**
* No 'tmpLocation' may happen if job generated created 0 splits, which happens if all
@@ -1015,7 +992,7 @@ public void abortJob(JobContext context, int status) throws IOException {
JobConf conf = ShimLoader.getHadoopShims().getJobConf(context);
Path tmpLocation = new Path(conf.get(TMP_LOCATION));
FileSystem fs = tmpLocation.getFileSystem(conf);
- LOG.debug("Removing " + tmpLocation.toString());
+ LOG.debug("Removing {}", tmpLocation);
fs.delete(tmpLocation, true);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index 99da86f910..52318edd8f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -32,7 +32,6 @@
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
@@ -40,7 +39,11 @@
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -49,8 +52,8 @@
* Superclass for all threads in the compactor.
*/
public abstract class CompactorThread extends Thread implements Configurable {
- static final private String CLASS_NAME = CompactorThread.class.getName();
- protected static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+
+ protected final Logger LOG = LoggerFactory.getLogger(getClass());
protected HiveConf conf;
protected RawStore rs;
@@ -89,11 +92,11 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
* Find the table being compacted
* @param ci compaction info returned from the compaction queue
* @return metastore table
- * @throws org.apache.hadoop.hive.metastore.api.MetaException if the table cannot be found.
+ * @throws CompactionException if the table cannot be found.
*/
- abstract Table resolveTable(CompactionInfo ci) throws MetaException;
+ abstract Optional
resolveTable(CompactionInfo ci) throws CompactionException;
- abstract boolean replIsCompactionDisabledForDatabase(String dbName) throws TException;
+ abstract boolean replIsCompactionDisabledForDatabase(String dbName) throws CompactionException;
/**
* Get list of partitions by name.
@@ -101,36 +104,41 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
* @return list of partitions
* @throws MetaException if an error occurs.
*/
- abstract List getPartitionsByNames(CompactionInfo ci) throws MetaException;
+ abstract List getPartitionsByNames(CompactionInfo ci) throws CompactionException;
/**
* Get the partition being compacted.
+ *
* @param ci compaction info returned from the compaction queue
- * @return metastore partition, or null if there is not partition in this compaction info
- * @throws Exception if underlying calls throw, or if the partition name resolves to more than
- * one partition.
+ * @return a metastore partition, if a partition is present, otherwise an
+ * empty Optional
+ * @throws CompactionException if underlying calls to retrieve partiton
+ * information fails, or if the partition name resolves to more than
+ * one partition.
*/
- protected Partition resolvePartition(CompactionInfo ci) throws Exception {
- if (ci.partName != null) {
- List parts;
- try {
- parts = getPartitionsByNames(ci);
- if (parts == null || parts.size() == 0) {
- // The partition got dropped before we went looking for it.
- return null;
- }
- } catch (Exception e) {
- LOG.error("Unable to find partition " + ci.getFullPartitionName() + ", " + e.getMessage());
- throw e;
- }
- if (parts.size() != 1) {
- LOG.error(ci.getFullPartitionName() + " does not refer to a single partition. " + parts);
- throw new MetaException("Too many partitions for : " + ci.getFullPartitionName());
- }
- return parts.get(0);
- } else {
- return null;
+ protected Optional resolvePartition(final CompactionInfo ci) throws CompactionException {
+ Objects.requireNonNull(ci, "CompactionInfo cannot be null");
+ if (ci.partName == null) {
+ return Optional.empty();
}
+ List parts = Collections.emptyList();
+ try {
+ parts = getPartitionsByNames(ci);
+ } catch (Exception e) {
+ throw new CompactionException("Failed to find partition " + ci.getFullPartitionName(), e);
+ }
+
+ final Iterator iter = parts.iterator();
+ if (!iter.hasNext()) {
+ LOG.debug("The partition got dropped before we went looking for it");
+ return Optional.empty();
+ }
+
+ final Partition partition = iter.next();
+ if (iter.hasNext()) {
+ throw new CompactionException("Too many partitions for: " + ci.getFullPartitionName());
+ }
+ return Optional.of(partition);
}
/**
@@ -139,8 +147,8 @@ protected Partition resolvePartition(CompactionInfo ci) throws Exception {
* @param p table from {@link #resolvePartition(org.apache.hadoop.hive.metastore.txn.CompactionInfo)}
* @return metastore storage descriptor.
*/
- protected StorageDescriptor resolveStorageDescriptor(Table t, Partition p) {
- return (p == null) ? t.getSd() : p.getSd();
+ protected StorageDescriptor resolveStorageDescriptor(Table t, Optional p) {
+ return (p.isPresent()) ? p.get().getSd() : t.getSd();
}
/**
@@ -155,12 +163,12 @@ protected StorageDescriptor resolveStorageDescriptor(Table t, Partition p) {
*/
protected String findUserToRunAs(String location, Table t) throws IOException,
InterruptedException {
- LOG.debug("Determining who to run the job as.");
+ LOG.debug("Determining who to run the job as");
final Path p = new Path(location);
final FileSystem fs = p.getFileSystem(conf);
try {
FileStatus stat = fs.getFileStatus(p);
- LOG.debug("Running job as " + stat.getOwner());
+ LOG.debug("Running job as {}", stat.getOwner());
return stat.getOwner();
} catch (AccessControlException e) {
// TODO not sure this is the right exception
@@ -187,13 +195,12 @@ public Object run() throws Exception {
}
if (wrapper.size() == 1) {
- LOG.debug("Running job as " + wrapper.get(0));
+ LOG.debug("Running job as {}", wrapper.get(0));
return wrapper.get(0);
}
}
- LOG.error("Unable to stat file " + p + " as either current user(" + UserGroupInformation.getLoginUser() +
- ") or table owner(" + t.getOwner() + "), giving up");
- throw new IOException("Unable to stat file: " + p);
+ throw new IOException("Unable to stat file " + p + " as either current user (" + UserGroupInformation.getLoginUser()
+ + ") or table owner (" + t.getOwner() + "), giving up");
}
/**
@@ -215,7 +222,6 @@ protected String tableName(Table t) {
public static void initializeAndStartThread(CompactorThread thread,
Configuration conf) throws Exception {
- LOG.info("Starting compactor thread of type " + thread.getClass().getName());
thread.setConf(conf);
thread.setThreadId(nextThreadId.incrementAndGet());
thread.init(new AtomicBoolean(), new AtomicBoolean());
@@ -224,10 +230,6 @@ public static void initializeAndStartThread(CompactorThread thread,
protected boolean replIsCompactionDisabledForTable(Table tbl) {
// Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail.
- boolean isCompactDisabled = ReplUtils.isFirstIncPending(tbl.getParameters());
- if (isCompactDisabled) {
- LOG.info("Compaction is disabled for table " + tbl.getTableName());
- }
- return isCompactDisabled;
+ return ReplUtils.isFirstIncPending(tbl.getParameters());
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 7a0e32463d..1cb190dc8a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -45,8 +45,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.Ref;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
@@ -54,6 +52,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -64,8 +63,6 @@
* It's critical that there exactly 1 of these in a given warehouse.
*/
public class Initiator extends MetaStoreCompactorThread {
- static final private String CLASS_NAME = Initiator.class.getName();
- static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
static final private String COMPACTORTHRESHOLD_PREFIX = "compactorthreshold.";
private Map tblNameOwnersCache = new HashMap<>();
@@ -108,14 +105,15 @@ public void run() {
for (CompactionInfo ci : potentials) {
// Disable minor compaction for query based compactor
if (!ci.isMajorCompaction() && HiveConf.getBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED)) {
- LOG.debug("Not compacting: " + ci.getFullPartitionName()
- + ", as query based compaction currently does not " + "support minor compactions.");
+ LOG.debug(
+ "Not compacting: {}, as query based compaction currently does not support minor compactions",
+ ci.getFullPartitionName());
continue;
}
LOG.info("Checking to see if we should compact " + ci.getFullPartitionName());
try {
- Table t = resolveTable(ci);
+ Optional
t = resolveTable(ci);
// Check if we already have initiated or are working on a compaction for this partition
// or table. If so, skip it. If we are just waiting on cleaning we can still check,
@@ -135,9 +133,9 @@ public void run() {
}
// Figure out who we should run the file operations as
- Partition p = resolvePartition(ci);
- if (p == null && ci.partName != null) {
- LOG.info("Can't find partition " + ci.getFullPartitionName() +
+ Optional p = resolvePartition(ci);
+ if (!p.isPresent() && ci.partName != null) {
+ LOG.info("Failed to find partition " + ci.getFullPartitionName() +
", assuming it has been dropped and moving on.");
continue;
}
@@ -145,26 +143,26 @@ public void run() {
.createValidReadTxnList(txnHandler.getOpenTxns(), 0);
conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
// The response will have one entry per table and hence we get only one ValidWriteIdList
- String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName());
+ String fullTableName = TxnUtils.getFullTableName(t.get().getDbName(), t.get().getTableName());
GetValidWriteIdsRequest rqst
= new GetValidWriteIdsRequest(Collections.singletonList(fullTableName));
rqst.setValidTxnList(validTxnList.writeToString());
final ValidWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList(
txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0));
- StorageDescriptor sd = resolveStorageDescriptor(t, p);
+ StorageDescriptor sd = resolveStorageDescriptor(t.get(), p);
String runAs = tblNameOwnersCache.get(fullTableName);
if (runAs == null) {
LOG.debug("unable to find the table owner in the cache for table "+ fullTableName + " " +
"will determine user based on table location");
- runAs = findUserToRunAs(sd.getLocation(), t);
+ runAs = findUserToRunAs(sd.getLocation(), t.get());
tblNameOwnersCache.put(fullTableName, runAs);
}
/*Future thought: checkForCompaction will check a lot of file metadata and may be expensive.
* Long term we should consider having a thread pool here and running checkForCompactionS
* in parallel*/
CompactionType compactionNeeded
- = checkForCompaction(ci, tblValidWriteIds, sd, t.getParameters(), runAs);
+ = checkForCompaction(ci, tblValidWriteIds, sd, t.get().getParameters(), runAs);
if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
} catch (Throwable t) {
LOG.error("Caught exception while trying to determine if we should compact " +
@@ -183,8 +181,7 @@ public void run() {
// Clean TXN_TO_WRITE_ID table for entries under min_uncommitted_txn referred by any open txns.
txnHandler.cleanTxnToWriteIdTable();
} catch (Throwable t) {
- LOG.error("Initiator loop caught unexpected exception this time through the loop: " +
- StringUtils.stringifyException(t));
+ LOG.error("Initiator loop caught unexpected exception this time through the loop", t);
}
finally {
if(handle != null) {
@@ -198,8 +195,7 @@ public void run() {
} while (!stop.get());
} catch (Throwable t) {
- LOG.error("Caught an exception in the main loop of compactor initiator, exiting " +
- StringUtils.stringifyException(t));
+ LOG.error("Caught an exception in the main loop of compactor initiator, exiting", t);
}
}
@@ -283,7 +279,7 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi
if (base != null) {
stat = fs.getFileStatus(base);
if (!stat.isDir()) {
- LOG.error("Was assuming base " + base.toString() + " is directory, but it's a file!");
+ LOG.error("Was assuming base " + base + " is directory, but it's a file!");
return null;
}
baseSize = sumDirSize(fs, base);
@@ -299,7 +295,7 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi
for (AcidUtils.ParsedDelta delta : deltas) {
stat = fs.getFileStatus(delta.getPath());
if (!stat.isDir()) {
- LOG.error("Was assuming delta " + delta.getPath().toString() + " is a directory, " +
+ LOG.error("Was assuming delta " + delta.getPath() + " is a directory, " +
"but it's a file!");
return null;
}
@@ -314,19 +310,12 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi
float deltaPctThreshold = deltaPctProp == null ?
HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) :
Float.parseFloat(deltaPctProp);
- boolean bigEnough = (float)deltaSize/(float)baseSize > deltaPctThreshold;
- if (LOG.isDebugEnabled()) {
- StringBuilder msg = new StringBuilder("delta size: ");
- msg.append(deltaSize);
- msg.append(" base size: ");
- msg.append(baseSize);
- msg.append(" threshold: ");
- msg.append(deltaPctThreshold);
- msg.append(" will major compact: ");
- msg.append(bigEnough);
- LOG.debug(msg.toString());
+ boolean bigEnough = (float) deltaSize / (float) baseSize > deltaPctThreshold;
+ LOG.debug("delta size: {} base size: {} threshold: {} will major compact: {}", deltaSize, baseSize,
+ deltaPctThreshold, bigEnough);
+ if (bigEnough) {
+ return CompactionType.MAJOR;
}
- if (bigEnough) return CompactionType.MAJOR;
}
String deltaNumProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX +
@@ -339,8 +328,8 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi
return null;
}
if (AcidUtils.isInsertOnlyTable(tblproperties)) {
- LOG.debug("Requesting a major compaction for a MM table; found " + deltas.size()
- + " delta files, threshold is " + deltaNumThreshold);
+ LOG.debug("Requesting a major compaction for a MM table; found {}" + " delta files, threshold is {}",
+ deltas.size(), deltaNumThreshold);
return CompactionType.MAJOR;
}
// TODO: this log statement looks wrong
@@ -352,7 +341,7 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi
}
private long sumDirSize(FileSystem fs, Path dir) throws IOException {
- long size = 0;
+ long size = 0L;
FileStatus[] buckets = fs.listStatus(dir, FileUtils.HIDDEN_FILES_PATH_FILTER);
for (int i = 0; i < buckets.length; i++) {
size += buckets[i].getLen();
@@ -385,43 +374,46 @@ private boolean noAutoCompactSet(Table t) {
// Check to see if this is a table level request on a partitioned table. If so,
// then it's a dynamic partitioning case and we shouldn't check the table itself.
- private static boolean checkDynPartitioning(Table t, CompactionInfo ci){
- if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 &&
- ci.partName == null) {
- LOG.debug("Skipping entry for " + ci.getFullTableName() + " as it is from dynamic" +
- " partitioning");
- return true;
+ private boolean checkDynPartitioning(Table t, CompactionInfo ci){
+ if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 && ci.partName == null) {
+ LOG.debug("Skipping entry for {} as it is from dynamic partitioning", ci.getFullTableName());
+ return true;
}
return false;
}
- private boolean checkCompactionElig(CompactionInfo ci){
- Table t = null;
+ private boolean checkCompactionElig(CompactionInfo ci) {
try {
- t = resolveTable(ci);
- if (t == null) {
- LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " +
- "table or has been dropped and moving on.");
+ Optional
t = resolveTable(ci);
+ if (!t.isPresent()) {
+ LOG.info("Failed to find table " + ci.getFullTableName() + ", assuming table is a "
+ + "temp table or has been dropped. Moving on.");
return false;
}
if (replIsCompactionDisabledForDatabase(ci.dbname)) {
+ LOG.info("Compaction is disabled for database " + ci.dbname);
return false;
}
- if (noAutoCompactSet(t)) {
- LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT +
- "=true so we will not compact it.");
+ if (noAutoCompactSet(t.get())) {
+ LOG.info("Table " + tableName(t.get()) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT
+ + "=true so we will not compact it.");
return false;
- } else if (replIsCompactionDisabledForTable(t)) {
+ }
+
+ if (replIsCompactionDisabledForTable(t.get())) {
+ LOG.info("Compaction is disabled for table " + t.get());
return false;
- } else if (checkDynPartitioning(t, ci)) {
+ }
+
+ if (checkDynPartitioning(t.get(), ci)) {
+ LOG.info("Dynamic partitioning in affect for table " + t.get());
return false;
}
} catch (Throwable e) {
- LOG.error("Caught Exception while checking compactiton eligibility " +
- StringUtils.stringifyException(e));
+ LOG.error("Caught Exception while checking compactiton eligibility", e);
}
return true;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
index 10681c0202..e40e84e346 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -109,14 +108,13 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD
commitCrudMajorCompaction(tmpLocation, tmpTableName, storageDescriptor.getLocation(), conf, writeIds,
compactorTxnId);
} catch (HiveException e) {
- LOG.error("Error doing query based major compaction", e);
- throw new IOException(e);
+ throw new IOException("Error doing query based major compaction", e);
} finally {
try {
DriverUtils.runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName);
} catch (HiveException e) {
- LOG.error("Unable to delete drop temp table {} which was created for running major compaction", tmpTableName);
- LOG.error(ExceptionUtils.getStackTrace(e));
+ LOG.error("Unable to delete drop temp table {} which was created for running major compaction", tmpTableName,
+ e);
}
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
index 8bfb524ac6..dab1a3a21e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -30,10 +30,10 @@
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import org.apache.thrift.TException;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
@@ -65,40 +65,35 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL), threadId);
}
- @Override Table resolveTable(CompactionInfo ci) throws MetaException {
+ @Override
+ Optional
resolveTable(final CompactionInfo ci) throws CompactionException {
try {
- return rs.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName);
+ return Optional.ofNullable(rs.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName));
} catch (MetaException e) {
- LOG.error("Unable to find table " + ci.getFullTableName() + ", " + e.getMessage());
- throw e;
+ throw new CompactionException("Unable to find table " + ci.getFullTableName(), e);
}
}
- @Override boolean replIsCompactionDisabledForDatabase(String dbName) throws TException {
+ @Override
+ boolean replIsCompactionDisabledForDatabase(String dbName) throws CompactionException {
try {
Database database = rs.getDatabase(getDefaultCatalog(conf), dbName);
- // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail.
- boolean isReplCompactDisabled = ReplUtils.isFirstIncPending(database.getParameters());
- if (isReplCompactDisabled) {
- LOG.info("Compaction is disabled for database " + dbName);
- }
- return isReplCompactDisabled;
+ // Compaction is disabled until after first successful incremental load.
+ // Check HIVE-21197 for more detail.
+ return ReplUtils.isFirstIncPending(database.getParameters());
} catch (NoSuchObjectException e) {
- LOG.info("Unable to find database " + dbName);
+ LOG.info("Failed to find database " + dbName);
return true;
}
}
- @Override List getPartitionsByNames(CompactionInfo ci) throws MetaException {
+ @Override
+ List getPartitionsByNames(CompactionInfo ci) throws CompactionException {
try {
return rs.getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, ci.tableName,
Collections.singletonList(ci.partName));
- } catch (MetaException e) {
- LOG.error("Unable to get partitions by name for CompactionInfo=" + ci);
- throw e;
- } catch (NoSuchObjectException e) {
- LOG.error("Unable to get partitions by name for CompactionInfo=" + ci);
- throw new MetaException(e.toString());
+ } catch (NoSuchObjectException | MetaException e) {
+ throw new CompactionException("Failed to get partition by name", e);
}
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
index 27a3ce8d2d..6fe9109831 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
@@ -68,8 +68,8 @@
@Override
void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor,
ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException {
- LOG.debug("Going to delete directories for aborted transactions for MM table " + table.getDbName() + "." + table
- .getTableName());
+ LOG.debug("Going to delete directories for aborted transactions for MM table {}.{}", table.getDbName(),
+ table.getTableName());
AcidUtils.Directory dir = AcidUtils
.getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false,
table.getParameters(), false);
@@ -78,7 +78,7 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD
// Then, actually do the compaction.
if (!compactionInfo.isMajorCompaction()) {
// Not supported for MM tables right now.
- LOG.info("Not compacting " + storageDescriptor.getLocation() + "; not a major compaction");
+ LOG.info("Not compacting {}; not a major compaction", storageDescriptor.getLocation());
return;
}
@@ -129,8 +129,7 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD
commitMmCompaction(tmpLocation, storageDescriptor.getLocation(), hiveConf, writeIds, compactorTxnId);
DriverUtils.runOnDriver(driverConf, user, sessionState, "drop table if exists " + tmpTableName);
} catch (HiveException e) {
- LOG.error("Error compacting a MM table", e);
- throw new IOException(e);
+ throw new IOException("Error compacting a MM table", e);
}
}
@@ -138,13 +137,13 @@ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageD
private void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throws IOException {
// For MM table, we only want to delete delta dirs for aborted txns.
List filesToDelete = dir.getAbortedDirectories();
- if (filesToDelete.size() < 1) {
+ if (filesToDelete.isEmpty()) {
return;
}
LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir);
FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
for (Path dead : filesToDelete) {
- LOG.debug("Going to delete path " + dead.toString());
+ LOG.debug("Going to delete path {}", dead);
fs.delete(dead, true);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
index 80119de22f..5d39f4a1ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
@@ -37,7 +37,7 @@
*/
abstract class QueryCompactor {
- private static final Logger LOG = LoggerFactory.getLogger(QueryCompactor.class.getName());
+ private static final Logger LOG = LoggerFactory.getLogger(QueryCompactor.class);
private static final String TMPDIR = "_tmp";
/**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
index 4235184fec..d4c3c937f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
@@ -20,7 +20,6 @@
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -30,6 +29,7 @@
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
@@ -47,33 +47,37 @@ public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
this.msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
}
- @Override Table resolveTable(CompactionInfo ci) throws MetaException {
+ @Override
+ Optional
resolveTable(CompactionInfo ci) throws CompactionException {
try {
- return msc.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName);
+ return Optional.ofNullable(msc.getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName));
} catch (TException e) {
- LOG.error("Unable to find table " + ci.getFullTableName() + ", " + e.getMessage());
- throw new MetaException(e.toString());
+ throw new CompactionException("Unable to find table " + ci.getFullTableName(), e);
}
}
- @Override boolean replIsCompactionDisabledForDatabase(String dbName) throws TException {
+ @Override
+ boolean replIsCompactionDisabledForDatabase(String dbName) throws CompactionException {
try {
Database database = msc.getDatabase(getDefaultCatalog(conf), dbName);
- // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail.
+ // Compaction is disabled until after first successful incremental load.
+ // Check HIVE-21197 for more detail.
return ReplUtils.isFirstIncPending(database.getParameters());
} catch (NoSuchObjectException e) {
LOG.info("Unable to find database " + dbName);
return true;
+ } catch (TException e) {
+ throw new CompactionException("Unable to get database: " + dbName, e);
}
}
- @Override List getPartitionsByNames(CompactionInfo ci) throws MetaException {
+ @Override
+ List getPartitionsByNames(CompactionInfo ci) throws CompactionException {
try {
return msc.getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, ci.tableName,
Collections.singletonList(ci.partName));
} catch (TException e) {
- LOG.error("Unable to get partitions by name for CompactionInfo=" + ci);
- throw new MetaException(e.toString());
+ throw new CompactionException("Unable to get partitions by name for CompactionInfo=" + ci, e);
}
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 3270175a80..bed149a035 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -44,7 +44,6 @@
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.net.InetAddress;
@@ -53,6 +52,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -97,23 +97,23 @@ public void run() {
}
final CompactionInfo ci = CompactionInfo.optionalCompactionInfoStructToInfo(
msc.findNextCompact(workerName));
- LOG.debug("Processing compaction request " + ci);
+ LOG.debug("Processing compaction request {}", ci);
if (ci == null && !stop.get()) {
try {
Thread.sleep(SLEEP_TIME);
continue;
} catch (InterruptedException e) {
- LOG.warn("Worker thread sleep interrupted " + e.getMessage());
+ LOG.warn("Worker thread sleep interrupted", e);
continue;
}
}
// Find the table we will be working with.
- Table t1 = null;
+ Optional
t1 = Optional.empty();
try {
t1 = resolveTable(ci);
- if (t1 == null) {
+ if (!t1.isPresent()) {
LOG.info("Unable to find table " + ci.getFullTableName() +
", assuming it was dropped and moving on.");
msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
@@ -125,13 +125,13 @@ public void run() {
}
// This chicanery is to get around the fact that the table needs to be final in order to
// go into the doAs below.
- final Table t = t1;
+ final Table t = t1.get();
// Find the partition we will be working with, if there is one.
- Partition p = null;
+ Optional p = Optional.empty();
try {
p = resolvePartition(ci);
- if (p == null && ci.partName != null) {
+ if (!p.isPresent() && ci.partName != null) {
LOG.info("Unable to find partition " + ci.getFullPartitionName() +
", assuming it was dropped and moving on.");
msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
@@ -190,11 +190,11 @@ public void run() {
launchedJob = true;
try {
if (runJobAsSelf(ci.runAs)) {
- mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, msc);
+ mr.run(conf, jobName.toString(), t, p.orElse(null), sd, tblValidWriteIds, ci, su, msc);
} else {
UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
UserGroupInformation.getLoginUser());
- final Partition fp = p;
+ final Partition fp = p.orElse(null);
ugi.doAs(new PrivilegedExceptionAction