diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/DDLTask.java index 3e4496471f..a2d5dd6a6f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/DDLTask.java @@ -92,7 +92,7 @@ private void failed(Throwable e) { e = e.getCause(); } setException(e); - LOG.error("Failed", e); + log.error("Failed", e); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java index dc6d31a9cb..8eb49583f8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java @@ -57,8 +57,6 @@ import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.serde2.io.DateWritableV2; import org.apache.hadoop.hive.serde2.io.TimestampWritableV2; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * ColumnStatsUpdateTask implementation. For example, ALTER TABLE src_stat @@ -70,8 +68,6 @@ public class ColumnStatsUpdateTask extends Task { private static final long serialVersionUID = 1L; - private static transient final Logger LOG = LoggerFactory - .getLogger(ColumnStatsUpdateTask.class); private ColumnStatistics constructColumnStatsFromInput() throws SemanticException, MetaException { @@ -79,7 +75,7 @@ private ColumnStatistics constructColumnStatsFromInput() // If we are replicating the stats, we don't need to construct those again. if (work.getColStats() != null) { ColumnStatistics colStats = work.getColStats(); - LOG.debug("Got stats through replication for " + + log.debug("Got stats through replication for " + colStats.getStatsDesc().getDbName() + "." + colStats.getStatsDesc().getTableName()); return colStats; @@ -362,7 +358,7 @@ public int execute() { return persistColumnStats(db); } catch (Exception e) { setException(e); - LOG.info("Failed to persist stats in metastore", e); + log.info("Failed to persist stats in metastore", e); } return 1; } @@ -384,7 +380,7 @@ private Date readDateValue(String dateStr) { return new Date(writableVal.getDays()); } catch (IllegalArgumentException err) { // Fallback to integer parsing - LOG.debug("Reading date value as days since epoch: {}", dateStr); + log.debug("Reading date value as days since epoch: {}", dateStr); return new Date(Long.parseLong(dateStr)); } } @@ -395,7 +391,7 @@ private Timestamp readTimestampValue(String timestampStr) { org.apache.hadoop.hive.common.type.Timestamp.valueOf(timestampStr)); return new Timestamp(writableVal.getSeconds()); } catch (IllegalArgumentException err) { - LOG.debug("Reading timestamp value as seconds since epoch: {}", timestampStr); + log.debug("Reading timestamp value as seconds since epoch: {}", timestampStr); return new Timestamp(Long.parseLong(timestampStr)); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java index 1929fab943..d36d266dc6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java @@ -37,12 +37,9 @@ import org.apache.hadoop.hive.ql.plan.ExplainSQRewriteWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.io.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class ExplainSQRewriteTask extends Task implements Serializable { private static final long serialVersionUID = 1L; - private final Logger LOG = LoggerFactory.getLogger(this.getClass().getName()); @Override public StageType getType() { @@ -77,7 +74,7 @@ public int execute() { } catch (Exception e) { setException(e); - LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); + log.error("Error executing task", e); return (1); } finally { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java index 750abcb6a6..8295ce0bb0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java @@ -80,7 +80,6 @@ import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; -import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; @@ -90,7 +89,6 @@ * **/ public class ExplainTask extends Task implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(ExplainTask.class.getName()); public static final String STAGE_DEPENDENCIES = "STAGE DEPENDENCIES"; private static final long serialVersionUID = 1L; @@ -457,7 +455,7 @@ public int execute() { jsonParser.print(jsonPlan, out); } catch (Exception e) { // if there is anything wrong happen, we bail out. - LOG.error("Running explain user level has problem." + + log.error("Running explain user level has problem." + " Falling back to normal explain.", e); work.getConfig().setFormatted(false); work.getConfig().setUserLevelExplain(false); @@ -470,7 +468,7 @@ public int execute() { JsonParser jsonParser = JsonParserFactory.getParser(conf); if (jsonParser != null) { jsonParser.print(jsonPlan, null); - LOG.info("JsonPlan is augmented to {}", jsonPlan); + log.info("JsonPlan is augmented to {}", jsonPlan); } out.print(jsonPlan); } @@ -482,7 +480,7 @@ public int execute() { return (0); } catch (Exception e) { - LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); + log.error("Error", e); setException(e); return (1); } @@ -1337,7 +1335,7 @@ public static String getExplainOutput(BaseSemanticAnalyzer sem, QueryPlan plan, ret = baos.toString(); } } catch (Exception e) { - LOG.warn("Exception generating explain output: " + e, e); + LoggerFactory.getLogger(ExplainTask.class).warn("Exception generating explain output", e); } return ret; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java index d3e94134ab..b593a44fae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -28,7 +28,6 @@ Licensed to the Apache Software Foundation (ASF) under one public class ExportTask extends Task implements Serializable { private static final long serialVersionUID = 1L; - private Logger LOG = LoggerFactory.getLogger(ExportTask.class); public ExportTask() { super(); @@ -46,13 +45,13 @@ public int execute() { TableExport.Paths exportPaths = new TableExport.Paths( work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), conf, false); Hive db = getHive(); - LOG.debug("Exporting data to: {}", exportPaths.metadataExportRootDir()); + log.debug("Exporting data to: {}", exportPaths.metadataExportRootDir()); work.acidPostProcess(db); TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf, work.getMmContext()); tableExport.write(true); } catch (Exception e) { - LOG.error("failed", e); + log.error("Failed to execute task", e); setException(e); return 1; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index 502d41d44d..151c3dac32 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -22,8 +22,6 @@ import java.io.Serializable; import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.TaskQueue; import org.apache.hadoop.hive.ql.QueryPlan; @@ -50,7 +48,6 @@ private FetchOperator fetch; private ListSinkOperator sink; private int totalRows; - private static transient final Logger LOG = LoggerFactory.getLogger(FetchTask.class); JobConf job = null; public FetchTask() { @@ -95,7 +92,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, TaskQueue tas } catch (Exception e) { // Bail out ungracefully - we should never hit // this here - but would have hit it in SemanticAnalyzer - LOG.error("Initialize failed", e); + log.error("Initialize failed", e); throw new RuntimeException(e); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 51de87f2fd..1ae8561661 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -63,8 +63,6 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.util.DirectionUtils; import org.apache.hadoop.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; @@ -81,7 +79,6 @@ public class MoveTask extends Task implements Serializable { private static final long serialVersionUID = 1L; - private static transient final Logger LOG = LoggerFactory.getLogger(MoveTask.class); public MoveTask() { super(); @@ -121,13 +118,13 @@ private void moveFileInDfs (Path sourcePath, Path targetPath, HiveConf conf) try { tgtFs = targetPath.getFileSystem(conf); } catch (IOException e) { - LOG.error("Failed to get dest fs", e); + log.error("Failed to get dest fs", e); throw new HiveException(e.getMessage(), e); } try { srcFs = sourcePath.getFileSystem(conf); } catch (IOException e) { - LOG.error("Failed to get src fs", e); + log.error("Failed to get src fs", e); throw new HiveException(e.getMessage(), e); } @@ -151,8 +148,8 @@ private void moveFileInDfs (Path sourcePath, Path targetPath, HiveConf conf) tgtFs.delete(deletePath, true); } } catch (IOException e) { - LOG.info("Unable to delete the path created for facilitating rename: {}", - deletePath); + log.info("Unable to delete the path created for facilitating rename: {}", + deletePath, e); } throw new HiveException("Unable to rename: " + sourcePath + " to: " + targetPath); @@ -223,7 +220,7 @@ private Path createTargetPath(Path targetPath, FileSystem fs) throws IOException private void releaseLocks(LoadTableDesc ltd) throws HiveException { // nothing needs to be done if (!conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY)) { - LOG.debug("No locks to release because Hive concurrency support is not enabled"); + log.debug("No locks to release because Hive concurrency support is not enabled"); return; } @@ -236,11 +233,11 @@ private void releaseLocks(LoadTableDesc ltd) throws HiveException { WriteEntity output = context.getLoadTableOutputMap().get(ltd); List lockObjects = context.getOutputLockObjects().get(output); if (CollectionUtils.isEmpty(lockObjects)) { - LOG.debug("No locks found to release"); + log.debug("No locks found to release"); return; } - LOG.info("Releasing {} locks", lockObjects.size()); + log.info("Releasing {} locks", lockObjects.size()); for (HiveLockObj lockObj : lockObjects) { List locks = lockMgr.getLocks(lockObj.getObj(), false, true); for (HiveLock lock : locks) { @@ -251,7 +248,7 @@ private void releaseLocks(LoadTableDesc ltd) throws HiveException { } catch (LockException le) { // should be OK since the lock is ephemeral and will eventually be deleted // when the query finishes and zookeeper session is closed. - LOG.warn("Could not release lock {}", lock.getHiveLockObject().getName(), le); + log.warn("Could not release lock {}", lock.getHiveLockObject().getName(), le); } } } @@ -332,7 +329,7 @@ public int execute() { List newFiles = new ArrayList<>(); Hive.moveAcidFiles(srcFs, srcs, targetPath, newFiles); } else { - LOG.debug("No files found to move from " + sourcePath + " to " + targetPath); + log.debug("No files found to move from " + sourcePath + " to " + targetPath); } } else { @@ -425,7 +422,7 @@ public int execute() { getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs()); } } else { - LOG.info("Partition is: {}", tbd.getPartitionSpec()); + log.info("Partition is: {}", tbd.getPartitionSpec()); // Check if the bucketing and/or sorting columns were inferred TaskInformation ti = new TaskInformation(this, tbd.getSourcePath().toUri().toString()); @@ -580,7 +577,7 @@ private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd, String loadTime = "\t Time taken to load dynamic partitions: " + (System.currentTimeMillis() - startTime)/1000.0 + " seconds"; console.printInfo(loadTime); - LOG.info(loadTime); + log.info(loadTime); if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) { throw new HiveException("This query creates no partitions." + @@ -623,7 +620,7 @@ private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd, queryState.getLineageState().setLineage(tbd.getSourcePath(), dc, table.getCols()); } - LOG.info("Loading partition " + entry.getKey()); + log.info("Loading partition " + entry.getKey()); } console.printInfo("\t Time taken for adding to write entity : " + (System.currentTimeMillis() - startTime)/1000.0 + " seconds"); @@ -728,7 +725,7 @@ private void checkFileFormats(Hive db, LoadTableDesc tbd, Table table) throw new HiveException(ErrorMsg.WRONG_FILE_FORMAT); } } else { - LOG.warn("Skipping file format check as dpCtx is not null"); + log.warn("Skipping file format check as dpCtx is not null"); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 66c3ced91a..2d5fff5b47 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.util.StringUtils; import static org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; @@ -56,8 +55,6 @@ private static final long serialVersionUID = 1L; - private static transient final Logger LOG = LoggerFactory.getLogger(ReplCopyTask.class); - public ReplCopyTask(){ super(); } @@ -73,12 +70,12 @@ private void updateSrcFileListForDupCopy(FileSystem dstFs, Path toPath, List{}", oneSrc.getPath(), toPath); + log.debug("ReplCopyTask :cp:{}=>{}", oneSrc.getPath(), toPath); srcFiles.add(new ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf), oneSrc.getPath(), null)); } if (work.isCopyToMigratedTxnTable()) { @@ -205,12 +202,12 @@ public int execute() { } } - LOG.debug("ReplCopyTask numFiles: {}", srcFiles.size()); + log.debug("ReplCopyTask numFiles: {}", srcFiles.size()); // in case of move optimization, file is directly copied to destination. So we need to clear the old content, if // its a replace (insert overwrite ) operation. if (work.getDeleteDestIfExist() && dstFs.exists(toPath)) { - LOG.debug(" path " + toPath + " is cleaned before renaming"); + log.debug(" path " + toPath + " is cleaned before renaming"); getHive().cleanUpOneDirectoryForReplace(toPath, dstFs, HIDDEN_FILES_PATH_FILTER, conf, work.getNeedRecycle(), work.getIsAutoPurge()); } @@ -229,7 +226,7 @@ public int execute() { copyUtils.renameFileCopiedFromCmPath(toPath, dstFs, srcFiles); return 0; } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); + log.error("Error", e); setException(e); return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } @@ -241,7 +238,7 @@ private boolean isDuplicateCopy(FileSystem dstFs, Path toPath, List filesInFileListing(FileSystem fs, Path dataPath) throws IOException { Path fileListing = new Path(dataPath, EximUtil.FILES_NAME); - LOG.debug("ReplCopyTask filesInFileListing() reading {}", fileListing.toUri()); + log.debug("ReplCopyTask filesInFileListing() reading {}", fileListing.toUri()); if (! fs.exists(fileListing)){ - LOG.debug("ReplCopyTask : _files does not exist"); + log.debug("ReplCopyTask : _files does not exist"); return null; // Returning null from this fn can serve as an err condition. // On success, but with nothing to return, we can return an empty list. } @@ -283,7 +280,7 @@ private Path getModifiedToPath(Path toPath) { String line; while ((line = br.readLine()) != null) { - LOG.debug("ReplCopyTask :_filesReadLine: {}", line); + log.debug("ReplCopyTask :_filesReadLine: {}", line); String[] fragments = ReplChangeManager.decodeFileUri(line); try { @@ -292,7 +289,7 @@ private Path getModifiedToPath(Path toPath) { filePaths.add(f); } catch (MetaException e) { // issue warning for missing file and throw exception - LOG.warn("Cannot find {} in source repo or cmroot", fragments[0]); + log.warn("Cannot find {} in source repo or cmroot", fragments[0]); throw new IOException(e.getMessage()); } // Note - we need srcFs rather than fs, because it is possible that the _files lists files @@ -338,7 +335,8 @@ public String getName() { boolean copyToMigratedTxnTable, boolean readSourceAsFileList, boolean overWrite) { Task copyTask = null; - LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath); + final Logger loadCopyLogger = LoggerFactory.getLogger(ReplCopyTask.class); + loadCopyLogger.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath); if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false, overWrite); rcwork.setReadSrcAsFilesList(readSourceAsFileList); @@ -352,12 +350,12 @@ public String getName() { // data invisible. Doing duplicate check and ignoring copy will cause consistency issue if there are multiple // replace events getting replayed in the first incremental load. rcwork.setCheckDuplicateCopy(replicationSpec.needDupCopyCheck() && !replicationSpec.isReplace()); - LOG.debug("ReplCopyTask:\trcwork"); + loadCopyLogger.debug("ReplCopyTask:\trcwork"); String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); rcwork.setDistCpDoAsUser(distCpDoAsUser); copyTask = TaskFactory.get(rcwork, conf); } else { - LOG.debug("ReplCopyTask:\tcwork"); + loadCopyLogger.debug("ReplCopyTask:\tcwork"); copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf); } return copyTask; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java index 9131aeee81..780283e847 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java @@ -58,7 +58,7 @@ public int execute() { tbl = Hive.get().getTable(work.getDbName(), tableName); if (!replicationSpec.allowReplacementInto(tbl.getParameters())) { // if the event is already replayed, then no need to replay it again. - LOG.debug("ReplTxnTask: Event is skipped as it is already replayed. Event Id: " + + log.debug("ReplTxnTask: Event is skipped as it is already replayed. Event Id: " + replicationSpec.getReplicationState() + "Event Type: " + work.getOperationType()); return 0; } @@ -68,16 +68,16 @@ public int execute() { Database database = Hive.get().getDatabase(work.getDbName()); if (!replicationSpec.allowReplacementInto(database.getParameters())) { // if the event is already replayed, then no need to replay it again. - LOG.debug("ReplTxnTask: Event is skipped as it is already replayed. Event Id: " + + log.debug("ReplTxnTask: Event is skipped as it is already replayed. Event Id: " + replicationSpec.getReplicationState() + "Event Type: " + work.getOperationType()); return 0; } } catch (HiveException e1) { - LOG.error("Get database failed with exception " + e1.getMessage()); + log.error("Get database failed with exception " + e1.getMessage()); return 1; } } catch (HiveException e) { - LOG.error("Get table failed with exception " + e.getMessage()); + log.error("Get table failed with exception " + e.getMessage()); return 1; } } @@ -89,7 +89,7 @@ public int execute() { case REPL_OPEN_TXN: List txnIds = txnManager.replOpenTxn(replPolicy, work.getTxnIds(), user); assert txnIds.size() == work.getTxnIds().size(); - LOG.info("Replayed OpenTxn Event for policy " + replPolicy + " with srcTxn " + + log.info("Replayed OpenTxn Event for policy " + replPolicy + " with srcTxn " + work.getTxnIds().toString() + " and target txn id " + txnIds.toString()); return 0; case REPL_MIGRATION_OPEN_TXN: @@ -97,20 +97,20 @@ public int execute() { if (txnManager.isTxnOpen()) { long txnId = txnManager.getCurrentTxnId(); txnManager.commitTxn(); - LOG.info("Committed txn from REPL_MIGRATION_OPEN_TXN : " + txnId); + log.info("Committed txn from REPL_MIGRATION_OPEN_TXN : " + txnId); } Long txnIdMigration = txnManager.openTxn(context, user); long writeId = txnManager.getTableWriteId(work.getDbName(), work.getTableName()); String validTxnList = txnManager.getValidTxns().toString(); conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList); conf.set(ReplUtils.REPL_CURRENT_TBL_WRITE_ID, Long.toString(writeId)); - LOG.info("Started open txn for migration : " + txnIdMigration + " with valid txn list : " + + log.info("Started open txn for migration : " + txnIdMigration + " with valid txn list : " + validTxnList + " and write id " + writeId); return 0; case REPL_ABORT_TXN: for (long txnId : work.getTxnIds()) { txnManager.replRollbackTxn(replPolicy, txnId); - LOG.info("Replayed AbortTxn Event for policy " + replPolicy + " with srcTxn " + txnId); + log.info("Replayed AbortTxn Event for policy " + replPolicy + " with srcTxn " + txnId); } return 0; case REPL_MIGRATION_COMMIT_TXN: @@ -121,7 +121,7 @@ public int execute() { txnManager.replCommitTxn(commitTxnRequestMigr); conf.unset(ValidTxnList.VALID_TXNS_KEY); conf.unset(ReplUtils.REPL_CURRENT_TBL_WRITE_ID); - LOG.info("Committed Migration Txn with replLastIdInfo: " + work.getReplLastIdInfo() + " for txnId: " + + log.info("Committed Migration Txn with replLastIdInfo: " + work.getReplLastIdInfo() + " for txnId: " + txnIdMigrationCommit); return 0; case REPL_COMMIT_TXN: @@ -133,7 +133,7 @@ public int execute() { commitTxnRequest.setReplPolicy(work.getReplPolicy()); commitTxnRequest.setWriteEventInfos(work.getWriteEventInfos()); txnManager.replCommitTxn(commitTxnRequest); - LOG.info("Replayed CommitTxn Event for replPolicy: " + replPolicy + " with srcTxn: " + txnId + + log.info("Replayed CommitTxn Event for replPolicy: " + replPolicy + " with srcTxn: " + txnId + "WriteEventInfos: " + work.getWriteEventInfos()); return 0; case REPL_ALLOC_WRITE_ID: @@ -141,17 +141,17 @@ public int execute() { String dbName = work.getDbName(); List txnToWriteIdList = work.getTxnToWriteIdList(); txnManager.replAllocateTableWriteIdsBatch(dbName, tableName, replPolicy, txnToWriteIdList); - LOG.info("Replayed alloc write Id Event for repl policy: " + replPolicy + " db Name : " + dbName + + log.info("Replayed alloc write Id Event for repl policy: " + replPolicy + " db Name : " + dbName + " txnToWriteIdList: " +txnToWriteIdList.toString() + " table name: " + tableName); return 0; case REPL_WRITEID_STATE: txnManager.replTableWriteIdState(work.getValidWriteIdList(), work.getDbName(), tableName, work.getPartNames()); - LOG.info("Replicated WriteId state for DbName: " + work.getDbName() + + log.info("Replicated WriteId state for DbName: " + work.getDbName() + " TableName: " + tableName + " ValidWriteIdList: " + work.getValidWriteIdList()); return 0; default: - LOG.error("Operation Type " + work.getOperationType() + " is not supported "); + log.error("Operation Type " + work.getOperationType() + " is not supported "); return 1; } } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java index 393d21dd0c..db8e97639a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; -import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.TaskQueue; import org.apache.hadoop.hive.ql.QueryPlan; @@ -54,7 +53,6 @@ public class StatsTask extends Task implements Serializable { private static final long serialVersionUID = 1L; - private static transient final Logger LOG = LoggerFactory.getLogger(StatsTask.class); public StatsTask() { super(); @@ -110,7 +108,7 @@ public int execute() { } } } catch (Exception e) { - LOG.error("Failed to run stats task", e); + log.error("Failed to run stats task", e); setException(e); return 1; } @@ -152,7 +150,8 @@ public static ExecutorService newThreadPool(HiveConf conf) { int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_STATS_GATHER_NUM_THREADS); ExecutorService executor = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("StatsNoJobTask-Thread-%d").build()); - LOG.info("Initialized threadpool for stats computation with {} threads", numThreads); + LoggerFactory.getLogger(StatsTask.class).info("Initialized threadpool for stats computation with {} threads", + numThreads); return executor; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 86a2964858..110b7cefd0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -55,6 +55,9 @@ public abstract class Task implements Serializable, Node { private static final long serialVersionUID = 1L; + + protected final transient Logger log = LoggerFactory.getLogger(getClass()); + public transient HashMap taskCounters; public transient TaskHandle taskHandle; protected transient HiveConf conf; @@ -67,7 +70,6 @@ protected transient String jobID; protected Task backupTask; protected List> backupChildrenTasks = new ArrayList>(); - protected static transient Logger LOG = LoggerFactory.getLogger(Task.class); protected int taskTag; private boolean isLocalMode =false; @@ -166,7 +168,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, TaskQueue tas } this.taskQueue = taskQueue; this.context = context; - this.console = new LogHelper(LOG); + this.console = new LogHelper(log); } public void setQueryDisplay(QueryDisplay queryDisplay) { this.queryDisplay = queryDisplay; @@ -189,7 +191,7 @@ protected Hive getHive() { // The conf object in HMS client is always different from the one used here. return Hive.get(conf); } catch (HiveException e) { - LOG.error(StringUtils.stringifyException(e)); + log.error(StringUtils.stringifyException(e)); throw new RuntimeException(e); } } @@ -208,7 +210,7 @@ public int executeTask(HiveHistory hiveHistory) { } if (conf != null) { - LOG.debug("Task getting executed using mapred tag : " + conf.get(MRJobConfig.JOB_TAGS)); + log.debug("Task getting executed using mapred tag : " + conf.get(MRJobConfig.JOB_TAGS)); } int retval = execute(); this.setDone(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 8a8822d560..60b76af808 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.log.LogDivertAppenderForTest; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; @@ -123,8 +122,6 @@ private transient boolean isShutdown = false; private transient boolean jobKilled = false; - protected static transient final Logger LOG = LoggerFactory.getLogger(ExecDriver.class); - private RunningJob rj; /** @@ -132,7 +129,7 @@ */ public ExecDriver() { super(); - console = new LogHelper(LOG); + console = new LogHelper(log); job = new JobConf(ExecDriver.class); this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this); } @@ -191,7 +188,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, TaskQueue tas public ExecDriver(MapredWork plan, JobConf job, boolean isSilent) throws HiveException { setWork(plan); this.job = job; - console = new LogHelper(LOG, isSilent); + console = new LogHelper(log, isSilent); this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this); } @@ -230,7 +227,7 @@ public int execute() { JobClient jc = null; if (taskQueue.isShutdown()) { - LOG.warn("Task was cancelled"); + log.warn("Task was cancelled"); return 5; } @@ -289,7 +286,7 @@ public int execute() { inpFormat = BucketizedHiveInputFormat.class.getName(); } - LOG.info("Using " + inpFormat); + log.info("Using " + inpFormat); try { job.setInputFormat(JavaUtils.loadClass(inpFormat)); @@ -334,19 +331,19 @@ public int execute() { CompressionUtils.tar(localPath.toUri().getPath(), fileNames,archiveFileName); Path archivePath = Utilities.generateTarPath(localPath, stageId); - LOG.info("Archive "+ hashtableFiles.length+" hash table files to " + archivePath); + log.info("Archive "+ hashtableFiles.length+" hash table files to " + archivePath); //upload archive file to hdfs Path hdfsFilePath =Utilities.generateTarPath(hdfsPath, stageId); short replication = (short) job.getInt("mapred.submit.replication", 10); hdfs.copyFromLocalFile(archivePath, hdfsFilePath); hdfs.setReplication(hdfsFilePath, replication); - LOG.info("Upload 1 archive file from" + archivePath + " to: " + hdfsFilePath); + log.info("Upload 1 archive file from" + archivePath + " to: " + hdfsFilePath); //add the archive file to distributed cache DistributedCache.createSymlink(job); DistributedCache.addCacheArchive(hdfsFilePath.toUri(), job); - LOG.info("Add 1 archive file to distributed cache. Archive file: " + hdfsFilePath.toUri()); + log.info("Add 1 archive file to distributed cache. Archive file: " + hdfsFilePath.toUri()); } } work.configureJobConf(job); @@ -364,7 +361,7 @@ public int execute() { rWork.setNumReduceTasks(1); job.setNumReduceTasks(1); } catch (Exception e) { - LOG.error("Sampling error", e); + log.error("Sampling error", e); console.printError(e.toString(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); rWork.setNumReduceTasks(1); @@ -374,7 +371,7 @@ public int execute() { jc = new JobClient(job); // make this client wait if job tracker is not behaving well. - Throttle.checkJobTracker(job, LOG); + Throttle.checkJobTracker(job, log); if (mWork.isGatheringStats() || (rWork != null && rWork.isGatheringStats())) { // initialize stats publishing table @@ -410,14 +407,14 @@ public int execute() { HiveConfUtil.updateJobCredentialProviders(job); // Finally SUBMIT the JOB! if (taskQueue.isShutdown()) { - LOG.warn("Task was cancelled"); + log.warn("Task was cancelled"); return 5; } rj = jc.submitJob(job); if (taskQueue.isShutdown()) { - LOG.warn("Task was cancelled"); + log.warn("Task was cancelled"); killJob(); return 5; } @@ -458,7 +455,7 @@ public int execute() { jc.close(); } } catch (Exception e) { - LOG.warn("Failed while cleaning up ", e); + log.warn("Failed while cleaning up ", e); } finally { HadoopJobExecHelper.runningJobs.remove(rj); } @@ -715,8 +712,7 @@ public static void main(String[] args) throws IOException, HiveException { setupChildLog4j(conf); } - Logger LOG = LoggerFactory.getLogger(ExecDriver.class.getName()); - LogHelper console = new LogHelper(LOG, isSilent); + LogHelper console = new LogHelper(LoggerFactory.getLogger(ExecDriver.class), isSilent); if (planFileName == null) { console.printError("Must specify Plan File Name"); @@ -888,7 +884,7 @@ private void killJob() { try { rj.killJob(); } catch (Exception e) { - LOG.warn("failed to kill job " + rj.getID(), e); + log.warn("failed to kill job " + rj.getID(), e); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java index b7bb7d0af6..8a01e018e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java @@ -122,8 +122,8 @@ public int execute() { // at this point the number of reducers is precisely defined in the plan int numReducers = work.getReduceWork() == null ? 0 : work.getReduceWork().getNumReduceTasks(); - if (LOG.isDebugEnabled()) { - LOG.debug("Task: " + getId() + ", Summary: " + + if (log.isDebugEnabled()) { + log.debug("Task: " + getId() + ", Summary: " + totalInputFileSize + "," + totalInputNumFiles + "," + numReducers); } @@ -184,7 +184,7 @@ public int execute() { // write out the plan to a local file Path planPath = new Path(ctx.getLocalTmpPath(), "plan.xml"); MapredWork plan = getWork(); - LOG.info("Generating plan file " + planPath.toString()); + log.info("Generating plan file " + planPath.toString()); OutputStream out = null; try { @@ -226,7 +226,7 @@ public int execute() { } } - LOG.info("Executing: " + cmdLine); + log.info("Executing: " + cmdLine); // Inherit Java system variables String hadoopOpts; @@ -307,17 +307,17 @@ public int execute() { errPrinter.join(); if (exitVal != 0) { - LOG.error("Execution failed with exit status: " + exitVal); + log.error("Execution failed with exit status: " + exitVal); if (SessionState.get() != null) { SessionState.get().addLocalMapRedErrors(getId(), errPrintStream.getOutput()); } } else { - LOG.info("Execution completed successfully"); + log.info("Execution completed successfully"); } return exitVal; } catch (Exception e) { - LOG.error("Got exception", e); + log.error("Got exception", e); return (1); } finally { try { @@ -328,7 +328,7 @@ public int execute() { } } catch (Exception e) { - LOG.error("Exception: ", e); + log.error("Exception: ", e); } } } @@ -522,7 +522,7 @@ public void updateWebUiStats(MapRedStats mapRedStats, RunningJob rj) { try { queryDisplay.updateTaskStatistics(mapRedStats, rj, getId()); } catch (IOException | JSONException e) { - LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e), e); + log.error("Error", e); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 78e3fd57f4..e75d6268f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -81,8 +81,6 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.StreamPrinter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * MapredLocalTask represents any local work (i.e.: client side work) that hive needs to @@ -99,10 +97,8 @@ private final Map fetchOperators = new HashMap(); protected HadoopJobExecHelper jobExecHelper; private JobConf job; - public static transient final Logger l4j = LoggerFactory.getLogger(MapredLocalTask.class); static final String HIVE_LOCAL_TASK_CHILD_OPTS_KEY = "HIVE_LOCAL_TASK_CHILD_OPTS"; public static MemoryMXBean memoryMXBean; - private static final Logger LOG = LoggerFactory.getLogger(MapredLocalTask.class); // not sure we need this exec context; but all the operators in the work // will pass this context throught @@ -118,7 +114,7 @@ public MapredLocalTask() { public MapredLocalTask(MapredLocalWork plan, JobConf job, boolean isSilent) throws HiveException { setWork(plan); this.job = job; - console = new LogHelper(LOG, isSilent); + console = new LogHelper(log, isSilent); } public void setExecContext(ExecMapperContext execContext) { @@ -172,7 +168,7 @@ private int executeInChildVM() { // write out the plan to a local file Path planPath = new Path(context.getLocalTmpPath(), "plan.xml"); MapredLocalWork plan = getWork(); - LOG.info("Generating plan file " + planPath.toString()); + log.info("Generating plan file " + planPath.toString()); OutputStream out = null; try { @@ -257,7 +253,7 @@ private int executeInChildVM() { // it also runs with hadoop permissions for the user the job is running as // This will be used by hadoop only in unsecure(/non kerberos) mode String endUserName = Utils.getUGI().getShortUserName(); - LOG.debug("setting HADOOP_USER_NAME\t" + endUserName); + log.debug("setting HADOOP_USER_NAME\t" + endUserName); variables.put("HADOOP_USER_NAME", endUserName); if (variables.containsKey(HADOOP_OPTS_KEY)) { @@ -316,10 +312,10 @@ private int executeInChildVM() { String name = entry.getKey(); String value = entry.getValue(); env[pos++] = name + "=" + value; - LOG.debug("Setting env: " + name + "=" + LogUtils.maskIfPassword(name, value)); + log.debug("Setting env: " + name + "=" + LogUtils.maskIfPassword(name, value)); } - LOG.info("Executing: " + cmdLine); + log.info("Executing: " + cmdLine); // Run ExecDriver in another JVM executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir)); @@ -328,10 +324,10 @@ private int executeInChildVM() { LogRedirector.redirect( Thread.currentThread().getName() + "-LocalTask-" + getName() + "-stdout", - new LogRedirector(executor.getInputStream(), LOG, callback)); + new LogRedirector(executor.getInputStream(), log, callback)); LogRedirector.redirect( Thread.currentThread().getName() + "-LocalTask-" + getName() + "-stderr", - new LogRedirector(executor.getErrorStream(), LOG, callback)); + new LogRedirector(executor.getErrorStream(), log, callback)); CachingPrintStream errPrintStream = new CachingPrintStream(System.err); @@ -348,17 +344,17 @@ private int executeInChildVM() { errPrinter.join(); if (exitVal != 0) { - LOG.error("Execution failed with exit status: " + exitVal); + log.error("Execution failed with exit status: " + exitVal); if (SessionState.get() != null) { SessionState.get().addLocalMapRedErrors(getId(), errPrintStream.getOutput()); } } else { - LOG.info("Execution completed successfully"); + log.info("Execution completed successfully"); } return exitVal; } catch (Exception e) { - LOG.error("Exception: ", e); + log.error("Exception: ", e); return (1); } finally { if (secureDoAs != null) { @@ -402,7 +398,7 @@ public int executeInProcess() { message = "Hive Runtime Error: Map local work failed"; retVal = 2; } - l4j.error(message, throwable); + log.error(message, throwable); console.printError(message, HiveStringUtils.stringifyException(throwable)); return retVal; } @@ -466,7 +462,7 @@ private void startForward(boolean inputFileChangeSenstive, String bigTableBucket private void initializeOperators(Map fetchOpJobConfMap) throws HiveException { for (Map.Entry> entry : work.getAliasToWork().entrySet()) { - LOG.debug("initializeOperators: " + entry.getKey() + ", children = " + entry.getValue().getChildOperators()); + log.debug("initializeOperators: " + entry.getKey() + ", children = " + entry.getValue().getChildOperators()); } // this mapper operator is used to initialize all the operators for (Map.Entry entry : work.getAliasToFetchWork().entrySet()) { @@ -490,7 +486,7 @@ private void initializeOperators(Map fetchOpJobConfMap) FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone); fetchOpJobConfMap.put(fetchOp, jobClone); fetchOperators.put(entry.getKey(), fetchOp); - l4j.info("fetchoperator for " + entry.getKey() + " created"); + log.info("fetchoperator for " + entry.getKey() + " created"); } // initialize all forward operator for (Map.Entry entry : fetchOperators.entrySet()) { @@ -510,7 +506,7 @@ private void initializeOperators(Map fetchOpJobConfMap) // initialize the forward operator ObjectInspector objectInspector = fetchOp.getOutputObjectInspector(); forwardOp.initialize(jobConf, new ObjectInspector[] {objectInspector}); - l4j.info("fetchoperator for " + entry.getKey() + " initialized"); + log.info("fetchoperator for " + entry.getKey() + " initialized"); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java index e8a8df1e12..c2893933b8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java @@ -27,8 +27,6 @@ import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import java.security.PrivilegedExceptionAction; @@ -42,7 +40,6 @@ * DirCopyTask, mainly to be used to copy External table data. */ public class DirCopyTask extends Task implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(DirCopyTask.class); private static final int MAX_COPY_RETRY = 5; private boolean createAndSetPathOwner(Path destPath, Path sourcePath) throws IOException { @@ -61,10 +58,10 @@ private boolean createAndSetPathOwner(Path destPath, Path sourcePath) throws IOE status = sourcePath.getFileSystem(conf).getFileStatus(sourcePath); } catch (FileNotFoundException e) { // Don't delete target path created else ddl task will try to create it using user hive and may fail. - LOG.warn("source path missing " + sourcePath); + log.warn("source path missing " + sourcePath, e); return createdDir; } - LOG.info("Setting permission for path dest {} from source {} owner {} : {} : {}", + log.info("Setting permission for path dest {} from source {} owner {} : {} : {}", destPath, sourcePath, status.getOwner(), status.getGroup(), status.getPermission()); destPath.getFileSystem(conf).setOwner(destPath, status.getOwner(), status.getGroup()); destPath.getFileSystem(conf).setPermission(destPath, status.getPermission()); @@ -89,34 +86,34 @@ private boolean checkIfPathExist(Path sourcePath, UserGroupInformation proxyUser private int handleException(Exception e, Path sourcePath, Path targetPath, int currentRetry, UserGroupInformation proxyUser) { try { - LOG.info("Checking if source path " + sourcePath + " is missing for exception ", e); + log.info("Checking if source path " + sourcePath + " is missing for exception", e); if (!checkIfPathExist(sourcePath, proxyUser)) { - LOG.info("Source path is missing. Ignoring exception."); + log.info("Source path is missing. Ignoring exception."); return 0; } } catch (Exception ex) { - LOG.warn("Source path missing check failed. ", ex); + log.warn("Source path missing check failed", ex); } // retry logic only for i/o exception if (!(e instanceof IOException)) { - LOG.error("Unable to copy {} to {}", sourcePath, targetPath, e); + log.error("Unable to copy {} to {}", sourcePath, targetPath, e); setException(e); return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } if (currentRetry <= MAX_COPY_RETRY) { - LOG.warn("Unable to copy {} to {}", sourcePath, targetPath, e); + log.warn("Unable to copy {} to {}", sourcePath, targetPath, e); } else { - LOG.error("Unable to copy {} to {} even after retrying for {} time", sourcePath, targetPath, currentRetry, e); + log.error("Unable to copy {} to {} even after retrying for {} time", sourcePath, targetPath, currentRetry, e); setException(e); return ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getErrorCode(); } int sleepTime = FileUtils.getSleepTime(currentRetry); - LOG.info("Sleep for " + sleepTime + " milliseconds before retry no " + (currentRetry)); + log.info("Sleep for " + sleepTime + " milliseconds before retry no " + (currentRetry)); try { Thread.sleep(sleepTime); } catch (InterruptedException timerEx) { - LOG.info("Sleep interrupted", timerEx.getMessage()); + log.info("Sleep interrupted", e); } try { if (proxyUser == null) { @@ -124,7 +121,7 @@ private int handleException(Exception e, Path sourcePath, Path targetPath, } FileSystem.closeAllForUGI(proxyUser); } catch (Exception ex) { - LOG.warn("Unable to closeAllForUGI for user " + proxyUser, ex); + log.warn("Unable to closeAllForUGI for user " + proxyUser, ex); } return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } @@ -176,7 +173,7 @@ public int execute() { try { FileSystem.closeAllForUGI(proxyUser); } catch (IOException e) { - LOG.error("Unable to closeAllForUGI for user " + proxyUser, e); + log.error("Unable to closeAllForUGI for user " + proxyUser, e); if (error == 0) { setException(e); error = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index a7fd0ef2fa..6a144c3c3c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -75,8 +75,6 @@ import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.security.auth.login.LoginException; import java.io.BufferedReader; @@ -124,7 +122,6 @@ public String getPrefix() { } } - private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class); private ReplLogger replLogger; @Override @@ -163,11 +160,11 @@ public int execute() { work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); initiateDataCopyTasks(); } else { - LOG.info("Previous Dump is not yet loaded"); + log.info("Previous Dump is not yet loaded"); } } } catch (Exception e) { - LOG.error("failed", e); + log.error("failed", e); setException(e); return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } @@ -177,7 +174,7 @@ public int execute() { private void initiateAuthorizationDumpTask() throws SemanticException { if (RANGER_AUTHORIZER.equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE))) { Path rangerDumpRoot = new Path(work.getCurrentDumpPath(), ReplUtils.REPL_RANGER_BASE_DIR); - LOG.info("Exporting Authorization Metadata from {} at {} ", RANGER_AUTHORIZER, rangerDumpRoot); + log.info("Exporting Authorization Metadata from {} at {} ", RANGER_AUTHORIZER, rangerDumpRoot); RangerDumpWork rangerDumpWork = new RangerDumpWork(rangerDumpRoot, work.dbNameOrPattern); Task rangerDumpTask = TaskFactory.get(rangerDumpWork, conf); if (childTasks == null) { @@ -204,7 +201,7 @@ private Path getCurrentDumpPath(Path dumpRoot, boolean isBootstrap) throws IOExc Path lastDumpPath = getLatestDumpPath(dumpRoot); if (lastDumpPath != null && shouldResumePreviousDump(lastDumpPath, isBootstrap)) { //Resume previous dump - LOG.info("Resuming the dump with existing dump directory {}", lastDumpPath); + log.info("Resuming the dump with existing dump directory {}", lastDumpPath); work.setShouldOverwrite(true); return lastDumpPath; } else { @@ -237,9 +234,9 @@ private void finishRemainingTasks() throws SemanticException { } private void prepareReturnValues(List values) throws SemanticException { - LOG.debug("prepareReturnValues : " + dumpSchema); + log.debug("prepareReturnValues : " + dumpSchema); for (String s : values) { - LOG.debug(" > " + s); + log.debug(" > " + s); } Utils.writeOutput(Collections.singletonList(values), new Path(work.resultTempPath), conf); } @@ -256,7 +253,7 @@ private void deleteAllPreviousDumpMeta(Path currentDumpPath) { } } } catch (Exception ex) { - LOG.warn("Possible leak on disk, could not delete the previous dump directory:" + currentDumpPath, ex); + log.warn("Possible leak on disk, could not delete the previous dump directory:" + currentDumpPath, ex); } } @@ -287,7 +284,7 @@ private Path getPreviousValidDumpMetadataPath(Path dumpRoot) throws IOException if (fs.exists(dumpRoot)) { FileStatus[] statuses = fs.listStatus(dumpRoot); for (FileStatus status : statuses) { - LOG.info("Evaluating previous dump dir path:{}", status.getPath()); + log.info("Evaluating previous dump dir path:{}", status.getPath()); if (latestValidStatus == null) { latestValidStatus = validDump(status.getPath()) ? status : null; } else if (validDump(status.getPath()) @@ -298,7 +295,7 @@ private Path getPreviousValidDumpMetadataPath(Path dumpRoot) throws IOException } Path latestDumpDir = (latestValidStatus == null) ? null : new Path(latestValidStatus.getPath(), ReplUtils.REPL_HIVE_BASE_DIR); - LOG.info("Selecting latest valid dump dir as {}", (latestDumpDir == null) ? "null" : latestDumpDir.toString()); + log.info("Selecting latest valid dump dir as {}", (latestDumpDir == null) ? "null" : latestDumpDir.toString()); return latestDumpDir; } @@ -439,7 +436,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive if (needBootstrapAcidTablesDuringIncrementalDump()) { bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L); assert (bootDumpBeginReplId >= 0); - LOG.info("Dump for bootstrapping ACID tables during an incremental dump for db {}", + log.info("Dump for bootstrapping ACID tables during an incremental dump for db {}", work.dbNameOrPattern); long timeoutInMs = HiveConf.getTimeVar(conf, HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS); @@ -485,7 +482,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive replLogger.startLog(); long dumpedCount = resumeFrom - work.eventFrom; if (dumpedCount > 0) { - LOG.info("Event id {} to {} are already dumped, skipping {} events", work.eventFrom, resumeFrom, dumpedCount); + log.info("Event id {} to {} are already dumped, skipping {} events", work.eventFrom, resumeFrom, dumpedCount); } cleanFailedEventDirIfExists(dumpRoot, resumeFrom); while (evIter.hasNext()) { @@ -501,7 +498,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive replLogger.endLog(lastReplId.toString()); - LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); + log.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot); // If repl policy is changed (oldReplScope is set), then pass the current replication policy, @@ -556,7 +553,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive } catch (InvalidTableException te) { // Repl dump shouldn't fail if the table is dropped/renamed while dumping it. // Just log a debug message and skip it. - LOG.debug(te.getMessage()); + log.debug(te.getMessage()); } } } @@ -573,7 +570,7 @@ private int getMaxEventAllowed(int currentEventMaxLimit) { if (maxDirItems > 0) { maxDirItems = maxDirItems - ReplUtils.RESERVED_DIR_ITEMS_COUNT; if (maxDirItems < currentEventMaxLimit) { - LOG.warn("Changing the maxEventLimit from {} to {} as the '" + ReplUtils.DFS_MAX_DIR_ITEMS_CONFIG + log.warn("Changing the maxEventLimit from {} to {} as the '" + ReplUtils.DFS_MAX_DIR_ITEMS_CONFIG + "' limit encountered. Set this config appropriately to increase the maxEventLimit", currentEventMaxLimit, maxDirItems); currentEventMaxLimit = maxDirItems; @@ -666,7 +663,7 @@ private void dumpTableListToDumpLocation(List tableList, Path dbRoot, St // Empty list will create an empty file to distinguish it from db level replication. If no file is there, that means // db level replication. If empty file is there, means no table satisfies the policy. if (tableList == null) { - LOG.debug("Table list file is not created for db level replication."); + log.debug("Table list file is not created for db level replication."); return; } @@ -686,25 +683,25 @@ private void dumpTableListToDumpLocation(List tableList, Path dbRoot, St writer.close(); break; } catch (IOException e) { - LOG.info("File operation failed", e); + log.info("File operation failed", e); if (count >= (FileUtils.MAX_IO_ERROR_RETRY - 1)) { //no need to wait in the last iteration - LOG.error("File " + tableListFile.toUri() + " creation failed even after " + + log.error("File " + tableListFile.toUri() + " creation failed even after " + FileUtils.MAX_IO_ERROR_RETRY + " attempts."); throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg()); } int sleepTime = FileUtils.getSleepTime(count); - LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (count+1)); + log.info("Sleep for " + sleepTime + " milliseconds before retry " + (count+1)); try { Thread.sleep(sleepTime); } catch (InterruptedException timerEx) { - LOG.info("Sleep interrupted", timerEx.getMessage()); + log.info("Sleep interrupted", timerEx.getMessage()); } FileSystem.closeAllForUGI(org.apache.hadoop.hive.shims.Utils.getUGI()); } count++; } - LOG.info("Table list file " + tableListFile.toUri() + " is created for table list - " + tableList); + log.info("Table list file " + tableListFile.toUri() + " is created for table list - " + tableList); } private List dirLocationsToCopy(List sourceLocations) @@ -731,7 +728,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) assert (bootDumpBeginReplId >= 0L); List tableList; - LOG.info("Bootstrap Dump for db {}", work.dbNameOrPattern); + log.info("Bootstrap Dump for db {}", work.dbNameOrPattern); List extTableCopyWorks = new ArrayList<>(); List managedTableCopyPaths = new ArrayList<>(); long timeoutInMs = HiveConf.getTimeVar(conf, @@ -745,7 +742,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) metadataPath.getFileSystem(conf).delete(metadataPath, true); } for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) { - LOG.debug("Dumping db: " + dbName); + log.debug("Dumping db: " + dbName); // TODO : Currently we don't support separate table list for each database. tableList = work.replScope.includeAllTables() ? null : new ArrayList<>(); Database db = hiveDb.getDatabase(dbName); @@ -768,7 +765,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) try (Writer writer = new Writer(dbRoot, conf)) { List extTableLocations = new LinkedList<>(); for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { - LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri()); + log.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri()); Table table = null; try { @@ -776,7 +773,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) table = tableTuple != null ? tableTuple.object : null; if (shouldDumpExternalTableLocation() && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())) { - LOG.debug("Adding table {} to external tables list", tblName); + log.debug("Adding table {} to external tables list", tblName); extTableLocations.addAll(writer.dataLocationDump(tableTuple.object)); } managedTableCopyPaths.addAll(dumpTable(dbName, tblName, validTxnList, dbRoot, dbDataRoot, @@ -785,7 +782,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) } catch (InvalidTableException te) { // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. // Just log a debug message and skip it. - LOG.debug(te.getMessage()); + log.debug(te.getMessage()); } dumpConstraintMetadata(dbName, tblName, dbRoot, hiveDb); if (tableList != null && isTableSatifiesConfig(table)) { @@ -803,7 +800,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) if (caught == null) { throw e; } else { - LOG.error("failed to reset the db state for " + uniqueKey + log.error("failed to reset the db state for " + uniqueKey + " on failure of repl dump", e); throw caught; } @@ -815,7 +812,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) replLogger.endLog(bootDumpBeginReplId.toString()); } Long bootDumpEndReplId = currentNotificationId(hiveDb); - LOG.info("Preparing to return {},{}->{}", + log.info("Preparing to return {},{}->{}", dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); dmd.write(true); @@ -829,7 +826,7 @@ private boolean shouldResumePreviousDump(DumpMetaData dumpMetaData) { try { return dumpMetaData.getEventFrom() != null; } catch (Exception e) { - LOG.info("No previous dump present"); + log.info("No previous dump present"); return false; } } @@ -848,7 +845,7 @@ private boolean shouldResumePreviousDump(Path lastDumpPath, boolean isBootStrap) try { resumeFrom = getResumeFrom(lastEventFile); } catch (SemanticException ex) { - LOG.info("Could not get last repl id from {}, because of:", lastEventFile, ex.getMessage()); + log.info("Could not get last repl id from {}, because of:", lastEventFile, ex.getMessage()); } return resumeFrom > 0L; } @@ -870,7 +867,7 @@ Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hive List dumpTable(String dbName, String tblName, String validTxnList, Path dbRootMetadata, Path dbRootData, long lastReplId, Hive hiveDb, HiveWrapper.Tuple tuple) throws Exception { - LOG.info("Bootstrap Dump for table " + tblName); + log.info("Bootstrap Dump for table " + tblName); TableSpec tableSpec = new TableSpec(tuple.object); TableExport.Paths exportPaths = new TableExport.Paths(work.astRepresentationForErrorMsg, dbRootMetadata, dbRootData, tblName, conf, true); @@ -941,7 +938,7 @@ String getValidTxnListForReplDump(Hive hiveDb, long waitUntilTime) throws HiveEx try { Thread.sleep(SLEEP_TIME); } catch (InterruptedException e) { - LOG.info("REPL DUMP thread sleep interrupted", e); + log.info("REPL DUMP thread sleep interrupted", e); } validTxnList = getTxnMgr().getValidTxns(); } @@ -953,7 +950,7 @@ String getValidTxnListForReplDump(Hive hiveDb, long waitUntilTime) throws HiveEx validTxnList = getTxnMgr().getValidTxns(); if (validTxnList.getMinOpenTxn() != null) { openTxns = getOpenTxns(validTxnList); - LOG.warn("REPL DUMP unable to force abort all the open txns: {} after timeout due to unknown reasons. " + + log.warn("REPL DUMP unable to force abort all the open txns: {} after timeout due to unknown reasons. " + "However, this is rare case that shouldn't happen.", openTxns); throw new IllegalStateException("REPL DUMP triggered abort txns failed for unknown reasons."); } @@ -992,7 +989,7 @@ private Path getLatestDumpPath(Path dumpRoot) throws IOException { if (statuses.length > 0) { FileStatus latestValidStatus = statuses[0]; for (FileStatus status : statuses) { - LOG.info("Evaluating previous dump dir path:{}", status.getPath()); + log.info("Evaluating previous dump dir path:{}", status.getPath()); if (status.getModificationTime() > latestValidStatus.getModificationTime()) { latestValidStatus = status; } @@ -1049,7 +1046,7 @@ void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiv } catch (NoSuchObjectException e) { // Bootstrap constraint dump shouldn't fail if the table is dropped/renamed while dumping it. // Just log a debug message and skip it. - LOG.debug(e.getMessage()); + log.debug(e.getMessage()); } } @@ -1057,7 +1054,7 @@ void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiv try { HiveWrapper.Tuple tuple = new HiveWrapper(hiveDb, dbName).function(functionName); if (tuple.object.getResourceUris().isEmpty()) { - LOG.warn("Not replicating function: " + functionName + " as it seems to have been created " + log.warn("Not replicating function: " + functionName + " as it seems to have been created " + "without USING clause"); return null; } @@ -1065,7 +1062,7 @@ void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiv } catch (HiveException e) { //This can happen as we are querying the getFunctions before we are getting the actual function //in between there can be a drop function by a user in which case our call will fail. - LOG.info("Function " + functionName + log.info("Function " + functionName + " could not be found, we are ignoring it as it can be a valid state ", e); return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 7a309626cc..dcfc575ec2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -116,10 +116,10 @@ public int execute() { return executeBootStrapLoad(); } } catch (RuntimeException e) { - LOG.error("replication failed with run time exception", e); + log.error("replication failed with run time exception", e); throw e; } catch (Exception e) { - LOG.error("replication failed", e); + log.error("replication failed", e); setException(e); return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } @@ -132,7 +132,7 @@ private boolean shouldLoadAuthorizationMetadata() { private void initiateAuthorizationLoadTask() throws SemanticException { if (RANGER_AUTHORIZER.equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE))) { Path rangerLoadRoot = new Path(new Path(work.dumpDirectory).getParent(), ReplUtils.REPL_RANGER_BASE_DIR); - LOG.info("Adding Import Ranger Metadata Task from {} ", rangerLoadRoot); + log.info("Adding Import Ranger Metadata Task from {} ", rangerLoadRoot); RangerLoadWork rangerLoadWork = new RangerLoadWork(rangerLoadRoot, work.getSourceDbName(), work.dbNameToLoadIn); Task rangerLoadTask = TaskFactory.get(rangerLoadWork, conf); if (childTasks == null) { @@ -287,12 +287,12 @@ a database ( directory ) query id -- generated in compile phase , adding a additional UUID to the end to print each run in separate files. */ - LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks()); + log.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks()); // Populate the driver context with the scratch dir info from the repl context, so that the // temp dirs will be cleaned up later context.getFsScratchDirs().putAll(loadContext.pathInfo.getFsScratchDirs()); createReplLoadCompleteAckTask(); - LOG.info("completed load task run : {}", work.executedLoadTask()); + log.info("Completed load task run : {}", work.executedLoadTask()); return 0; } @@ -394,8 +394,8 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep for (String table : tableNames) { db.dropTable(dbName + "." + table, true); } - LOG.info("Tables in the Database: {} that are excluded in the replication scope are dropped.", - dbName); + log.info("Tables in the Database: {} that are excluded in the replication scope are dropped.", + dbName); } private void createReplLoadCompleteAckTask() { @@ -542,7 +542,7 @@ private int executeIncrementalLoad() throws Exception { TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc), conf); DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(updateReplIdTask)); work.setLastReplIDUpdated(true); - LOG.debug("Added task to set last repl id of db " + dbName + " to " + lastEventid); + log.debug("Added task to set last repl id of db " + dbName + " to " + lastEventid); } } // Once all the incremental events are applied, enable bootstrap of tables if exist. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java index 5abfa4de51..21986058fb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java @@ -56,7 +56,7 @@ public int execute() { } } catch (TException | HiveException e) { setException(e); - LOG.error("Failed", e); + log.error("Failed", e); return 1; } return 0; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index f401b4dcea..d314691f0b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -40,8 +40,6 @@ import org.apache.hadoop.hive.ql.exec.spark.status.SparkStage; import org.apache.hive.spark.counter.SparkCounter; import org.apache.hive.spark.counter.SparkCounters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -79,8 +77,7 @@ public class SparkTask extends Task { private static final String CLASS_NAME = SparkTask.class.getName(); - private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - private static final LogHelper console = new LogHelper(LOG); + private transient final LogHelper console; private PerfLogger perfLogger; private static final long serialVersionUID = 1L; // The id of the actual Spark job @@ -99,6 +96,12 @@ private transient boolean isShutdown = false; private transient boolean jobKilled = false; + + public SparkTask() { + super(); + this.console = new LogHelper(log); + } + @Override public int execute() { @@ -122,7 +125,7 @@ public int execute() { // If the driver context has been shutdown (due to query cancellation) kill the Spark job if (taskQueue.isShutdown()) { - LOG.warn("Killing Spark job"); + log.warn("Killing Spark job"); killJob(); throw new HiveException(String.format("Spark task %s cancelled for query %s", getId(), sparkWork.getQueryId())); } @@ -133,7 +136,7 @@ public int execute() { // Add Spark job handle id to the Hive History addToHistory(Keys.SPARK_JOB_HANDLE_ID, jobRef.getJobId()); - LOG.debug("Starting Spark job with job handle id " + sparkJobHandleId); + log.debug("Starting Spark job with job handle id " + sparkJobHandleId); // Get the application id of the Spark app jobID = jobRef.getSparkJobStatus().getAppID(); @@ -168,21 +171,21 @@ public int execute() { } printConsoleMetrics(); printExcessiveGCWarning(); - if (LOG.isInfoEnabled() && sparkStatistics != null) { - LOG.info(sparkStatisticsToString(sparkStatistics, sparkJobID)); + if (log.isInfoEnabled() && sparkStatistics != null) { + log.info(sparkStatisticsToString(sparkStatistics, sparkJobID)); } - LOG.info("Successfully completed Spark job[" + sparkJobID + "] with application ID " + + log.info("Successfully completed Spark job[" + sparkJobID + "] with application ID " + jobID + " and task ID " + getId()); } else if (rc == 2) { // Cancel job if the monitor found job submission timeout. // TODO: If the timeout is because of lack of resources in the cluster, we should // ideally also cancel the app request here. But w/o facilities from Spark or YARN, // it's difficult to do it on hive side alone. See HIVE-12650. - LOG.debug("Failed to submit Spark job with job handle id " + sparkJobHandleId); - LOG.info("Failed to submit Spark job for application id " + (Strings.isNullOrEmpty(jobID) + log.debug("Failed to submit Spark job with job handle id " + sparkJobHandleId); + log.info("Failed to submit Spark job for application id " + (Strings.isNullOrEmpty(jobID) ? "UNKNOWN" : jobID)); killJob(); } else if (rc == 4) { - LOG.info("The Spark job or one stage of it has too many tasks" + + log.info("The Spark job or one stage of it has too many tasks" + ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID); killJob(); } @@ -192,7 +195,7 @@ public int execute() { } sparkJobStatus.cleanup(); } catch (Exception e) { - LOG.error("Failed to execute Spark task \"" + getId() + "\"", e); + log.error("Failed to execute Spark task \"" + getId() + "\"", e); setException(e); if (e instanceof HiveException) { HiveException he = (HiveException) e; @@ -216,7 +219,7 @@ public int execute() { try { sparkSessionManager.returnSession(sparkSession); } catch (HiveException ex) { - LOG.error("Failed to return the session to SessionManager", ex); + log.error("Failed to return the session to SessionManager", ex); } } } @@ -465,7 +468,7 @@ public void shutdown() { } private void killJob() { - LOG.debug("Killing Spark job with job handle id " + sparkJobHandleId); + log.debug("Killing Spark job with job handle id " + sparkJobHandleId); boolean needToKillJob = false; if (jobRef != null && !jobKilled) { synchronized (this) { @@ -479,7 +482,7 @@ private void killJob() { try { jobRef.cancelJob(); } catch (Exception e) { - LOG.warn("Failed to kill Spark job", e); + log.warn("Failed to kill Spark job", e); } } } @@ -562,7 +565,7 @@ private void getSparkJobInfo(SparkJobStatus sparkJobStatus) { totalTaskCount = sumTotal; failedTaskCount = sumFailed; } catch (Exception e) { - LOG.error("Failed to get Spark job information", e); + log.error("Failed to get Spark job information", e); } } @@ -592,7 +595,7 @@ void setSparkException(SparkJobStatus sparkJobStatus, int rc) { if ((monitorError instanceof InterruptedException) || (monitorError instanceof HiveException && monitorError.getCause() instanceof InterruptedException)) { - LOG.info("Killing Spark job since query was interrupted"); + log.info("Killing Spark job since query was interrupted"); killJob(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index b1bf2f8903..bc0687264f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -98,7 +98,6 @@ public class TezTask extends Task { private static final String CLASS_NAME = TezTask.class.getName(); - private static transient Logger LOG = LoggerFactory.getLogger(CLASS_NAME); private final PerfLogger perfLogger = SessionState.getPerfLogger(); private static final String TEZ_MEMORY_RESERVE_FRACTION = "tez.task.scale.memory.reserve-fraction"; @@ -155,7 +154,7 @@ public int execute() { // we need to set the global to null to do that, this "reuse" may be pointless. TezSessionState session = sessionRef.value = ss.getTezSession(); if (session != null && !session.isOpen()) { - LOG.warn("The session: " + session + " has not been opened"); + log.warn("The session: " + session + " has not been opened"); } // We only need a username for UGI to use for groups; getGroups will fetch the groups @@ -169,7 +168,7 @@ public int execute() { try { groups = UserGroupInformation.createRemoteUser(userName).getGroups(); } catch (Exception ex) { - LOG.warn("Cannot obtain groups for " + userName, ex); + log.warn("Cannot obtain groups for " + userName, ex); } } MappingInput mi = new MappingInput(userName, groups, @@ -195,7 +194,7 @@ public int execute() { try { ss.setTezSession(session); - LOG.info("Subscribed to counters: {} for queryId: {}", wmContext.getSubscribedCounters(), + log.info("Subscribed to counters: {} for queryId: {}", wmContext.getSubscribedCounters(), wmContext.getQueryId()); // Ensure the session is open and has the necessary local resources. @@ -253,7 +252,7 @@ public int execute() { counters = mergedCounters; } catch (Exception err) { // Don't fail execution due to counters - just don't print summary info - LOG.warn("Failed to get counters. Ignoring, summary info will be incomplete. " + err, err); + log.warn("Failed to get counters. Ignoring, summary info will be incomplete. " + err, err); counters = null; } } finally { @@ -267,7 +266,7 @@ public int execute() { sessionRef.value.returnToSessionManager(); } } catch (Exception e) { - LOG.error("Failed to return session: {} to pool", session, e); + log.error("Failed to return session: {} to pool", session, e); throw e; } @@ -281,19 +280,19 @@ public int execute() { } } - if (LOG.isInfoEnabled() && counters != null + if (log.isInfoEnabled() && counters != null && (HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || Utilities.isPerfOrAboveLogging(conf))) { for (CounterGroup group: counters) { - LOG.info(group.getDisplayName() +":"); + log.info(group.getDisplayName() +":"); for (TezCounter counter: group) { - LOG.info(" "+counter.getDisplayName()+": "+counter.getValue()); + log.info(" "+counter.getDisplayName()+": "+counter.getValue()); } } } updateNumRows(); } catch (Exception e) { - LOG.error("Failed to execute tez graph.", e); + log.error("Failed to execute tez graph.", e); setException(e); // rc will be 1 at this point indicating failure. } finally { @@ -312,7 +311,7 @@ public int execute() { ctx.clear(); } catch (Exception e) { /*best effort*/ - LOG.warn("Failed to clean up after tez job", e); + log.warn("Failed to clean up after tez job", e); } } // need to either move tmp files or remove them @@ -354,22 +353,22 @@ private String getUserNameForGroups(SessionState ss) { private void closeDagClientOnCancellation(DAGClient dagClient) { try { dagClient.tryKillDAG(); - LOG.info("Waiting for Tez task to shut down: " + this); + log.info("Waiting for Tez task to shut down: " + this); dagClient.waitForCompletion(); } catch (Exception ex) { - LOG.warn("Failed to shut down TezTask" + this, ex); + log.warn("Failed to shut down TezTask" + this, ex); } closeDagClientWithoutEx(dagClient); } private void logResources(List additionalLr) { // log which resources we're adding (apart from the hive exec) - if (!LOG.isDebugEnabled()) return; + if (!log.isDebugEnabled()) return; if (additionalLr == null || additionalLr.size() == 0) { - LOG.debug("No local resources to process (other than hive-exec)"); + log.debug("No local resources to process (other than hive-exec)"); } else { for (LocalResource lr: additionalLr) { - LOG.debug("Adding local resource: " + lr.getResource()); + log.debug("Adding local resource: " + lr.getResource()); } } } @@ -387,10 +386,10 @@ void ensureSessionHasResources( // of it, in non-pool case perf doesn't matter so we might as well open at get time // and then call update like we do in the else. // Can happen if the user sets the tez flag after the session was established. - LOG.info("Tez session hasn't been created yet. Opening session"); + log.info("Tez session hasn't been created yet. Opening session"); session.open(nonConfResources); } else { - LOG.info("Session is already open"); + log.info("Session is already open"); session.ensureLocalResources(conf, nonConfResources); } } @@ -416,7 +415,7 @@ DAG build(JobConf conf, TezWork tezWork, Path scratchDir, Context ctx, // the name of the dag is what is displayed in the AM/Job UI String dagName = utils.createDagName(conf, queryPlan); - LOG.info("Dag name: " + dagName); + log.info("Dag name: " + dagName); DAG dag = DAG.create(dagName); // set some info for the query @@ -424,12 +423,7 @@ DAG build(JobConf conf, TezWork tezWork, Path scratchDir, Context ctx, .put("description", ctx.getCmd()); String dagInfo = json.toString(); - String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID); - dag.setConf(HiveConf.ConfVars.HIVEQUERYID.varname, queryId); - - if (LOG.isDebugEnabled()) { - LOG.debug("DagInfo: " + dagInfo); - } + log.debug("DagInfo: {}", dagInfo); dag.setDAGInfo(dagInfo); dag.setCredentials(conf.getCredentials()); @@ -495,12 +489,12 @@ DAG build(JobConf conf, TezWork tezWork, Path scratchDir, Context ctx, } int newValue = originalValue / tezWork.getChildren(workUnit).size(); wxConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, Integer.toString(newValue)); - LOG.info("Modified " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " to " + newValue); + log.info("Modified " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " to " + newValue); } if (workUnit.getReservedMemoryMB() > 0) { // If reversedMemoryMB is set, make memory allocation fraction adjustment as needed double frac = DagUtils.adjustMemoryReserveFraction(workUnit.getReservedMemoryMB(), super.conf); - LOG.info("Setting " + TEZ_MEMORY_RESERVE_FRACTION + " to " + frac); + log.info("Setting " + TEZ_MEMORY_RESERVE_FRACTION + " to " + frac); wx.setConf(TEZ_MEMORY_RESERVE_FRACTION, Double.toString(frac)); } // Otherwise just leave it up to Tez to decide how much memory to allocate dag.addVertex(wx); @@ -545,10 +539,9 @@ private static void setAccessControlsForCurrentUser(DAG dag, String queryId, String modifyStr = Utilities.getAclStringWithHiveModification(conf, TezConfiguration.TEZ_AM_MODIFY_ACLS, addHs2User, user, loginUser); - if (LOG.isDebugEnabled()) { - LOG.debug("Setting Tez DAG access for queryId={} with viewAclString={}, modifyStr={}", - queryId, viewStr, modifyStr); - } + LoggerFactory.getLogger(TezTask.class).debug( + "Setting Tez DAG access for queryId={} with viewAclString={}, modifyStr={}", queryId, viewStr, modifyStr); + // set permissions for current user on DAG DAGAccessControls ac = new DAGAccessControls(viewStr, modifyStr); dag.setAccessControls(ac); @@ -637,7 +630,7 @@ private static void closeDagClientWithoutEx(DAGClient dagClient) { try { dagClient.close(); } catch (Exception e) { - LOG.warn("Failed to close DagClient", e); + LoggerFactory.getLogger(TezTask.class).warn("Failed to close DagClient", e); } } @@ -713,7 +706,7 @@ public void shutdown() { dagClient = this.dagClient; // Don't set dagClient to null here - execute will only clean up operators if it's set. } - LOG.info("Shutting down Tez task " + this + " " + log.info("Shutting down Tez task " + this + " " + ((dagClient == null) ? " before submit" : "")); if (dagClient == null) return; closeDagClientOnCancellation(dagClient); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java index 7fb3878ee6..5f15a6ce5d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java @@ -149,7 +149,7 @@ public int execute() { } // make this client wait if job trcker is not behaving well. - Throttle.checkJobTracker(job, LOG); + Throttle.checkJobTracker(job, log); // Finally SUBMIT the JOB! rj = jc.submitJob(job); @@ -194,7 +194,7 @@ public int execute() { } } catch (Exception e) { // jobClose needs to execute successfully otherwise fail task - LOG.warn("Job close failed ",e); + log.warn("Job close failed ",e); if (success) { setException(e); success = false; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java index 0458c946c0..8dcbd06d9a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java @@ -93,7 +93,7 @@ public int execute() { ctxCreated = true; } }catch (IOException e) { - LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); + log.error("Error", e); setException(e); return 5; } @@ -118,7 +118,7 @@ public int execute() { } String inpFormat = HiveConf.getVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT); - LOG.info("Using " + inpFormat); + log.info("Using " + inpFormat); try { job.setInputFormat(JavaUtils.loadClass(inpFormat)); @@ -135,7 +135,7 @@ public int execute() { } } catch (IOException e) { setException(e); - LOG.error("Can't make path " + outputPath, e); + log.error("Can't make path " + outputPath, e); return 6; } @@ -181,7 +181,7 @@ public int execute() { } // make this client wait if job trcker is not behaving well. - Throttle.checkJobTracker(job, LOG); + Throttle.checkJobTracker(job, log); // Finally SUBMIT the JOB! rj = jc.submitJob(job); @@ -191,9 +191,7 @@ public int execute() { } catch (Exception e) { String mesg = rj != null ? ("Ended Job = " + rj.getJobID()) : "Job Submission failed"; - // Has to use full name to make sure it does not conflict with - // org.apache.commons.lang3.StringUtils - LOG.error(mesg, org.apache.hadoop.util.StringUtils.stringifyException(e)); + log.error(mesg, e); setException(e); success = false; @@ -211,7 +209,7 @@ public int execute() { ColumnTruncateMapper.jobClose(outputPath, success, job, console, work.getDynPartCtx(), null); } catch (Exception e) { - LOG.warn("Failed while cleaning up ", e); + log.warn("Failed while cleaning up ", e); } finally { HadoopJobExecHelper.runningJobs.remove(rj); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index 163d439018..80a09ea575 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -644,7 +644,7 @@ public boolean addDependentTask(Task dependent) { count++; System.err.println("YAH:getDepTasks got called!"); (new Exception()).printStackTrace(System.err); - LOG.info("YAH!getDepTasks", new Exception()); + log.info("YAH!getDepTasks", new Exception()); return super.getDependentTasks(); }