diff --git common/src/java/org/apache/hadoop/hive/common/log/LogRedirector.java common/src/java/org/apache/hadoop/hive/common/log/LogRedirector.java new file mode 100644 index 0000000000000000000000000000000000000000..8b526704dcd52fd47030930ccf8afccc1daf58df --- /dev/null +++ common/src/java/org/apache/hadoop/hive/common/log/LogRedirector.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.log; + +import org.slf4j.Logger; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import java.util.List; + +/** + * Class used to redirect output read from a stream to a logger + */ +public class LogRedirector implements Runnable { + + private static final long MAX_ERR_LOG_LINES_FOR_RPC = 1000; + + public interface ParentHealthCheck { + boolean isAlive(); + } + + private final Logger logger; + private final BufferedReader in; + private final ParentHealthCheck parent; + private List errLogs; + private int numErrLogLines = 0; + + public LogRedirector(InputStream in, Logger logger, ParentHealthCheck parent) { + this.in = new BufferedReader(new InputStreamReader(in)); + this.parent = parent; + this.logger = logger; + } + + public LogRedirector(InputStream in, Logger logger, List errLogs, + ParentHealthCheck parent) { + this.in = new BufferedReader(new InputStreamReader(in)); + this.errLogs = errLogs; + this.parent = parent; + this.logger = logger; + } + + @Override + public void run() { + try { + String line = null; + while ((line = in.readLine()) != null) { + logger.info(line); + if (errLogs != null) { + if (numErrLogLines++ < MAX_ERR_LOG_LINES_FOR_RPC) { + errLogs.add(line); + } + } + } + } catch (IOException e) { + if (parent.isAlive()) { + logger.warn("I/O error in redirector thread.", e); + } else { + // When stopping the process we are redirecting from, + // the streams might be closed during reading. + // We should not log the related exceptions in a visible level + // as they might mislead the user. + logger.debug("I/O error in redirector thread while stopping the remote driver", e); + } + } catch (Exception e) { + logger.warn("Error in redirector thread.", e); + } + } + + /** + * Start the logredirector in a new thread + * @param name name of the new thread + * @param redirector redirector to start + */ + public static void redirect(String name, LogRedirector redirector) { + Thread thread = new Thread(redirector); + thread.setName(name); + thread.setDaemon(true); + thread.start(); + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 9dfefeece56d7c3127c6921b5ad880fcaa926d56..e5d0062e63b6fb4ab6c5738d32bc2f77a11b956a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.io.CachingPrintStream; +import org.apache.hadoop.hive.common.log.LogRedirector; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; @@ -79,6 +80,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.StreamPrinter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -325,6 +327,20 @@ public int executeInChildVM(DriverContext driverContext) { // Run ExecDriver in another JVM executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir)); + final LogRedirector.ParentHealthCheck healthCheck = new LogRedirector.ParentHealthCheck() { + + @Override + public boolean isAlive() { + return executor.isAlive(); + } + }; + LogRedirector.redirect( + Thread.currentThread().getName() + "-LocalTask-" + getName() + "-stdout", + new LogRedirector(executor.getInputStream(), LOG, healthCheck)); + LogRedirector.redirect( + Thread.currentThread().getName() + "-LocalTask-" + getName() + "-stderr", + new LogRedirector(executor.getErrorStream(), LOG, healthCheck)); + CachingPrintStream errPrintStream = new CachingPrintStream(System.err); StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out); @@ -384,14 +400,19 @@ public int executeInProcess(DriverContext driverContext) { console.printInfo(Utilities.now() + "\tEnd of local task; Time Taken: " + Utilities.showTime(elapsed) + " sec."); } catch (Throwable throwable) { + int retVal; + String message; if (throwable instanceof OutOfMemoryError || (throwable instanceof MapJoinMemoryExhaustionError)) { - l4j.error("Hive Runtime Error: Map local work exhausted memory", throwable); - return 3; + message = "Hive Runtime Error: Map local work exhausted memory"; + retVal = 3; } else { - l4j.error("Hive Runtime Error: Map local work failed", throwable); - return 2; + message = "Hive Runtime Error: Map local work failed"; + retVal = 2; } + l4j.error(message, throwable); + console.printError(message, HiveStringUtils.stringifyException(throwable)); + return retVal; } return 0; } diff --git ql/src/test/results/clientpositive/auto_join25.q.out ql/src/test/results/clientpositive/auto_join25.q.out index 534bdb6ff0461614deab6e6fb84409cd34422f96..d24e0c3945d66067d1eb4480f65a665f60e26d6f 100644 --- ql/src/test/results/clientpositive/auto_join25.q.out +++ ql/src/test/results/clientpositive/auto_join25.q.out @@ -18,6 +18,7 @@ PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 PREHOOK: Output: default@dest1 +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask POSTHOOK: query: FROM srcpart src1 JOIN src src2 ON (src1.key = src2.key) @@ -62,8 +63,10 @@ INSERT OVERWRITE TABLE dest_j2 SELECT src1.key, src3.value PREHOOK: type: QUERY PREHOOK: Input: default@src PREHOOK: Output: default@dest_j2 +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask POSTHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key) @@ -105,6 +108,7 @@ INSERT OVERWRITE TABLE dest_j1 SELECT src1.key, src2.value PREHOOK: type: QUERY PREHOOK: Input: default@src PREHOOK: Output: default@dest_j1 +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask POSTHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) diff --git ql/src/test/results/clientpositive/auto_join_without_localtask.q.out ql/src/test/results/clientpositive/auto_join_without_localtask.q.out index 57f00674de30087143ef579cd0cff2bc024f529f..a8ae0007ddd4e7cdc300154ec896aa85e6b76f6a 100644 --- ql/src/test/results/clientpositive/auto_join_without_localtask.q.out +++ ql/src/test/results/clientpositive/auto_join_without_localtask.q.out @@ -1045,8 +1045,10 @@ PREHOOK: query: select a.* from src a join src b on a.key=b.key join src c on a. PREHOOK: type: QUERY PREHOOK: Input: default@src #### A masked pattern was here #### +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask POSTHOOK: query: select a.* from src a join src b on a.key=b.key join src c on a.value=c.value where a.key>100 order by a.key, a.value limit 40 diff --git ql/src/test/results/clientpositive/bucketsortoptimize_insert_8.q.out ql/src/test/results/clientpositive/bucketsortoptimize_insert_8.q.out index f0e77f00afe0c753fbaa402c27662b2559c3e4d7..1b3d741206daf85f8ed7a2969bc8adbfefa68468 100644 --- ql/src/test/results/clientpositive/bucketsortoptimize_insert_8.q.out +++ ql/src/test/results/clientpositive/bucketsortoptimize_insert_8.q.out @@ -273,6 +273,7 @@ PREHOOK: Input: default@test_table1@ds=1 PREHOOK: Input: default@test_table2 PREHOOK: Input: default@test_table2@ds=1 PREHOOK: Output: default@test_table3@ds=1 +Hive Runtime Error: Map local work failed FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask POSTHOOK: query: INSERT OVERWRITE TABLE test_table3 PARTITION (ds = '1') @@ -551,6 +552,7 @@ PREHOOK: Input: default@test_table1@ds=1 PREHOOK: Input: default@test_table2 PREHOOK: Input: default@test_table2@ds=1 PREHOOK: Output: default@test_table3@ds=1 +Hive Runtime Error: Map local work failed FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask POSTHOOK: query: INSERT OVERWRITE TABLE test_table3 PARTITION (ds = '1') diff --git ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out index 52ebe5aa8db035434c1bb0978b7b3ac2a25e062a..639df07746f2c39037341c259e9bcab69754dc9f 100644 --- ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out +++ ql/src/test/results/clientpositive/infer_bucket_sort_convert_join.q.out @@ -62,6 +62,7 @@ SELECT a.key, b.value FROM src a JOIN src b ON a.key = b.key PREHOOK: type: QUERY PREHOOK: Input: default@src PREHOOK: Output: default@test_table@part=1 +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask POSTHOOK: query: INSERT OVERWRITE TABLE test_table PARTITION (part = '1') diff --git ql/src/test/results/clientpositive/mapjoin_hook.q.out ql/src/test/results/clientpositive/mapjoin_hook.q.out index a9f9be3a4dca764f70303412cc07dd31f7cece1c..f80a26a252b1457b8374d3040f255e77cf94db47 100644 --- ql/src/test/results/clientpositive/mapjoin_hook.q.out +++ ql/src/test/results/clientpositive/mapjoin_hook.q.out @@ -38,6 +38,7 @@ PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 PREHOOK: Output: default@dest1 +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask [MapJoinCounter PostHook] COMMON_JOIN: 0 HINTED_MAPJOIN: 0 HINTED_MAPJOIN_LOCAL: 0 CONVERTED_MAPJOIN: 0 CONVERTED_MAPJOIN_LOCAL: 1 BACKUP_COMMON_JOIN: 1 @@ -51,8 +52,10 @@ INSERT OVERWRITE TABLE dest1 SELECT src1.key, src3.value PREHOOK: type: QUERY PREHOOK: Input: default@src PREHOOK: Output: default@dest1 +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask +Hive Runtime Error: Map local work exhausted memory FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask [MapJoinCounter PostHook] COMMON_JOIN: 0 HINTED_MAPJOIN: 0 HINTED_MAPJOIN_LOCAL: 0 CONVERTED_MAPJOIN: 0 CONVERTED_MAPJOIN_LOCAL: 2 BACKUP_COMMON_JOIN: 2 diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 03e773a803f1dc7edfb9be8d795aba32535bd433..275c20041391f67166252e1d9fa28bebf3b62739 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -32,13 +32,10 @@ import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; -import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.Serializable; import java.io.Writer; @@ -53,9 +50,9 @@ import java.util.UUID; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.common.log.LogRedirector; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -75,7 +72,6 @@ private static final Logger LOG = LoggerFactory.getLogger(SparkClientImpl.class); private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds - private static final long MAX_ERR_LOG_LINES_FOR_RPC = 1000; private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS"; private static final String SPARK_HOME_ENV = "SPARK_HOME"; @@ -490,8 +486,17 @@ public void run() { final Process child = pb.start(); String threadName = Thread.currentThread().getName(); final List childErrorLog = Collections.synchronizedList(new ArrayList()); - redirect("RemoteDriver-stdout-redir-" + threadName, new Redirector(child.getInputStream())); - redirect("RemoteDriver-stderr-redir-" + threadName, new Redirector(child.getErrorStream(), childErrorLog)); + final LogRedirector.ParentHealthCheck healthCheck = new LogRedirector.ParentHealthCheck() { + + @Override + public boolean isAlive() { + return isAlive; + } + }; + LogRedirector.redirect("RemoteDriver-stdout-redir-" + threadName, + new LogRedirector(child.getInputStream(), LOG, healthCheck)); + LogRedirector.redirect("RemoteDriver-stderr-redir-" + threadName, + new LogRedirector(child.getErrorStream(), LOG, childErrorLog, healthCheck)); runnable = new Runnable() { @Override @@ -542,13 +547,6 @@ private String getSparkJobCredentialProviderPassword() { return null; } - private void redirect(String name, Redirector redirector) { - Thread thread = new Thread(redirector); - thread.setName(name); - thread.setDaemon(true); - thread.start(); - } - private class ClientProtocol extends BaseProtocol { JobHandleImpl submit(Job job, List> listeners) { @@ -653,48 +651,6 @@ private void handle(ChannelHandlerContext ctx, JobSubmitted msg) { } - private class Redirector implements Runnable { - - private final BufferedReader in; - private List errLogs; - private int numErrLogLines = 0; - - Redirector(InputStream in) { - this.in = new BufferedReader(new InputStreamReader(in)); - } - - Redirector(InputStream in, List errLogs) { - this.in = new BufferedReader(new InputStreamReader(in)); - this.errLogs = errLogs; - } - - @Override - public void run() { - try { - String line = null; - while ((line = in.readLine()) != null) { - LOG.info(line); - if (errLogs != null) { - if (numErrLogLines++ < MAX_ERR_LOG_LINES_FOR_RPC) { - errLogs.add(line); - } - } - } - } catch (IOException e) { - if (isAlive) { - LOG.warn("I/O error in redirector thread.", e); - } else { - // When stopping the remote driver the process might be destroyed during reading from the stream. - // We should not log the related exceptions in a visible level as they might mislead the user. - LOG.debug("I/O error in redirector thread while stopping the remote driver", e); - } - } catch (Exception e) { - LOG.warn("Error in redirector thread.", e); - } - } - - } - private static class AddJarJob implements Job { private static final long serialVersionUID = 1L;