diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java index 6dfaa9f..136c7df 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java @@ -20,8 +20,13 @@ import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.hive.ql.session.OperationLog; +import org.apache.hadoop.hive.ql.session.OperationLog.LoggingLevel; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.slf4j.Logger; /** * The class to store query level info such as queryId. Multiple queries can run @@ -109,4 +114,69 @@ public void setCommandType(HiveOperation commandType) { public HiveConf getConf() { return queryConf; } + + /** + * This class provides a logger helper to print the log to the log fiel and + * deliver the log to the client console through OperationLog as well. + */ + public static class ClientConsoleLogger { + + private Logger LOG; + private boolean isSilent; + private OperationLog clientLogger = OperationLog.getCurrentOperationLog(); + + public ClientConsoleLogger(Logger LOG) { + this(LOG, false); + } + + public ClientConsoleLogger(Logger LOG, boolean isSilent) { + this.LOG = LOG; + this.isSilent = isSilent; + } + + public boolean getIsSilent() { + SessionState ss = SessionState.get(); + // use the session or the one supplied in constructor + return (ss != null) ? ss.getIsSilent() : isSilent; + } + + public void logInfo(String info) { + logInfo(info, null); + } + + public void logInfo(String info, String detail) { + LOG.info(info + StringUtils.defaultString(detail)); + } + + public void printInfo(String info) { + printInfo(info, null); + } + + public void printInfo(String info, String detail) { + if (!getIsSilent()) { + if (clientLogger != null) { + clientLogger.writeOperationLog(LoggingLevel.EXECUTION, info + "\n"); + } else { + // In subprocess OperationLog doesn't exist. The log is printed to console + // and redirect to OperationLog. Used by MapredLocalTask. + System.out.println(info); + } + } + LOG.info(info + StringUtils.defaultString(detail)); + } + + + public void printError(String error) { + printError(error, null); + } + + public void printError(String error, String detail) { + if (clientLogger != null) { + clientLogger.writeOperationLog(LoggingLevel.EXECUTION, error + "\n"); + } else { + System.err.println(error); + } + LOG.error(error + StringUtils.defaultString(detail)); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index e1bd291..0b27980 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -28,7 +28,6 @@ import java.util.List; import org.apache.hadoop.hive.common.metrics.common.Metrics; -import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; @@ -42,7 +41,6 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +56,7 @@ public transient TaskHandle taskHandle; protected transient HiveConf conf; protected transient QueryState queryState; - protected transient LogHelper console; + protected transient QueryState.ClientConsoleLogger console; protected transient QueryPlan queryPlan; protected transient DriverContext driverContext; protected transient boolean clonedConf = false; @@ -157,7 +155,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext this.queryState = queryState; this.conf = queryState.getConf(); this.driverContext = driverContext; - console = new LogHelper(LOG); + console = new QueryState.ClientConsoleLogger(LOG); } public void setQueryDisplay(QueryDisplay queryDisplay) { this.queryDisplay = queryDisplay; @@ -603,7 +601,7 @@ protected void setException(Throwable ex) { exception = ex; } - public void setConsole(LogHelper console) { + public void setConsole(QueryState.ClientConsoleLogger console) { this.console = console; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 34b683c..e160dd8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -79,7 +79,6 @@ import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; @@ -126,7 +125,7 @@ */ public ExecDriver() { super(); - console = new LogHelper(LOG); + console = new QueryState.ClientConsoleLogger(LOG); job = new JobConf(ExecDriver.class); this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this); } @@ -186,7 +185,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext public ExecDriver(MapredWork plan, JobConf job, boolean isSilent) throws HiveException { setWork(plan); this.job = job; - console = new LogHelper(LOG, isSilent); + console = new QueryState.ClientConsoleLogger(LOG, isSilent); this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this); } @@ -695,7 +694,7 @@ public static void main(String[] args) throws IOException, HiveException { } Logger LOG = LoggerFactory.getLogger(ExecDriver.class.getName()); - LogHelper console = new LogHelper(LOG, isSilent); + QueryState.ClientConsoleLogger console = new QueryState.ClientConsoleLogger(LOG, isSilent); if (planFileName == null) { console.printError("Must specify Plan File Name"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index 3c07197..c246be2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.MapRedStats; +import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskHandle; @@ -42,7 +43,6 @@ import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.stats.ClientStatsPublisher; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.Counters; @@ -76,7 +76,7 @@ protected transient int lastReduceProgress; public transient JobID jobId; - private final LogHelper console; + private final QueryState.ClientConsoleLogger console; private final HadoopJobExecHook callBackObj; private final String queryId; @@ -144,7 +144,7 @@ public void setJobId(JobID jobId) { this.jobId = jobId; } - public HadoopJobExecHelper(JobConf job, LogHelper console, + public HadoopJobExecHelper(JobConf job, QueryState.ClientConsoleLogger console, Task task, HadoopJobExecHook hookCallBack) { this.queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID, "unknown-" + System.currentTimeMillis()); this.job = job; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java index d320536..cfaa434 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java @@ -32,9 +32,9 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution; import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RunningJob; @@ -47,7 +47,7 @@ public class JobDebugger implements Runnable { private final JobConf conf; private final RunningJob rj; - private final LogHelper console; + private final QueryState.ClientConsoleLogger console; private final Map>> stackTraces; // Mapping from task ID to the number of failures private final Map failures = new HashMap(); @@ -99,14 +99,14 @@ public int getErrorCode() { } } - public JobDebugger(JobConf conf, RunningJob rj, LogHelper console) { + public JobDebugger(JobConf conf, RunningJob rj, QueryState.ClientConsoleLogger console) { this.conf = conf; this.rj = rj; this.console = console; this.stackTraces = null; } - public JobDebugger(JobConf conf, RunningJob rj, LogHelper console, + public JobDebugger(JobConf conf, RunningJob rj, QueryState.ClientConsoleLogger console, Map>> stackTraces) { this.conf = conf; this.rj = rj; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 591ea97..f12a351 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -71,7 +71,6 @@ import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.OperationLog; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -117,7 +116,7 @@ public MapredLocalTask() { public MapredLocalTask(MapredLocalWork plan, JobConf job, boolean isSilent) throws HiveException { setWork(plan); this.job = job; - console = new LogHelper(LOG, isSilent); + console = new QueryState.ClientConsoleLogger(LOG, isSilent); } public void setExecContext(ExecMapperContext execContext) { @@ -331,9 +330,9 @@ public int executeInChildVM(DriverContext driverContext) { StreamPrinter errPrinter; OperationLog operationLog = OperationLog.getCurrentOperationLog(); if (operationLog != null) { - outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out, + outPrinter = new StreamPrinter(executor.getInputStream(), null, operationLog.getPrintStream()); - errPrinter = new StreamPrinter(executor.getErrorStream(), null, errPrintStream, + errPrinter = new StreamPrinter(executor.getErrorStream(), null, operationLog.getPrintStream()); } else { outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java index bd537cd..f98ad57 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java @@ -20,9 +20,6 @@ import java.io.IOException; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -33,7 +30,6 @@ import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileValueBufferWrapper; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.shims.CombineHiveKey; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.CompressionCodec; @@ -42,6 +38,9 @@ import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @SuppressWarnings("deprecation") public class ColumnTruncateMapper extends MapReduceBase implements @@ -230,7 +229,7 @@ public static Path backupOutputPath(FileSystem fs, Path outpath, JobConf job) } public static void jobClose(Path outputPath, boolean success, JobConf job, - LogHelper console, DynamicPartitionCtx dynPartCtx, Reporter reporter + DynamicPartitionCtx dynPartCtx, Reporter reporter ) throws HiveException, IOException { FileSystem fs = outputPath.getFileSystem(job); Path backupPath = backupOutputPath(fs, outputPath, job); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java index 8e89b71..d45e3bd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java @@ -217,7 +217,7 @@ public int execute(DriverContext driverContext) { rj.killJob(); } } - ColumnTruncateMapper.jobClose(outputPath, success, job, console, + ColumnTruncateMapper.jobClose(outputPath, success, job, work.getDynPartCtx(), null); } catch (Exception e) { LOG.warn("Failed while cleaning up ", e); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 2b52056..ab0d1ad 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -25,10 +25,10 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.never; import java.io.IOException; import java.util.ArrayList; @@ -47,6 +47,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -58,7 +59,6 @@ import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.Resource; @@ -163,7 +163,7 @@ public Edge answer(InvocationOnMock invocation) throws Throwable { task = new TezTask(utils); task.setWork(work); - task.setConsole(mock(LogHelper.class)); + task.setConsole(mock(QueryState.ClientConsoleLogger.class)); QueryPlan mockQueryPlan = mock(QueryPlan.class); doReturn(UUID.randomUUID().toString()).when(mockQueryPlan).getQueryId(); task.setQueryPlan(mockQueryPlan);