Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5298

TaskManager crashes when TM log not existant

    Details

      Description

      java.io.FileNotFoundException: flink-taskmanager.out (No such file or directory)
          at java.io.FileInputStream.open0(Native Method)
          at java.io.FileInputStream.open(FileInputStream.java:195)
          at java.io.FileInputStream.<init>(FileInputStream.java:138)
          at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleRequestTaskManagerLog(TaskManager.scala:833)
          at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:340)
          at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
          at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
          at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
          at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
          at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
          at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
          at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
          at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
          at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
          at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
          at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
          at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
          at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122)
          at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
          at akka.actor.ActorCell.invoke(ActorCell.scala:487)
          at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
          at akka.dispatch.Mailbox.run(Mailbox.scala:221)
          at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
          at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
          at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
          at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
          at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      2016-12-08 16:45:14,995 INFO  org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager  - Stopping TaskManager akka://flink/user/taskmanager#1361882659.
      2016-12-08 16:45:14,995 INFO  org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager  - Disassociating from JobManager
      2016-12-08 16:45:14,997 INFO  org.apache.flink.runtime.blob.BlobCache                       - Shutting down BlobCache
      2016-12-08 16:45:15,006 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager removed spill file directory /tmp/flink-io-e61f717b-630c-4a2a-b3e3-62ccb40aa2f9
      2016-12-08 16:45:15,006 INFO  org.apache.flink.runtime.io.network.NetworkEnvironment        - Shutting down the network environment and its components.
      2016-12-08 16:45:15,008 INFO  org.apache.flink.runtime.io.network.netty.NettyClient         - Successful shutdown (took 1 ms).
      2016-12-08 16:45:15,009 INFO  org.apache.flink.runtime.io.network.netty.NettyServer         - Successful shutdown (took 0 ms).
      2016-12-08 16:45:15,020 INFO  org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager  - Task manager akka://flink/user/taskmanager is completely shut down.
      2016-12-08 16:45:15,023 ERROR org.apache.flink.runtime.taskmanager.TaskManager              - Actor akka://flink/user/taskmanager#1361882659 terminated, stopping process...
      

        Issue Links

          Activity

          Hide
          Makman2 Mischa Krüger added a comment -

          This happened when the log was requested via the web UI.

          Show
          Makman2 Mischa Krüger added a comment - This happened when the log was requested via the web UI.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zentol opened a pull request:

          https://github.com/apache/flink/pull/2974

          FLINK-5298 TM checks that log file exists

          This PR slightly modifies the `TaskManager#handleRequestTaskManagerLog`. For one it verifies that the log file actually exists before opening it. Second, if the logFilePathOption is empty it no longer throws an IOException (which could crash the TM) but instead forwards it to the sender.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zentol/flink 5298_tm_log

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2974.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2974


          commit c135d1940e61a4a80b274042e6e095f3369ec911
          Author: zentol <chesnay@apache.org>
          Date: 2016-12-08T18:28:12Z

          FLINK-5298 TM checks that log file exists


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/2974 FLINK-5298 TM checks that log file exists This PR slightly modifies the `TaskManager#handleRequestTaskManagerLog`. For one it verifies that the log file actually exists before opening it. Second, if the logFilePathOption is empty it no longer throws an IOException (which could crash the TM) but instead forwards it to the sender. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5298_tm_log Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2974.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2974 commit c135d1940e61a4a80b274042e6e095f3369ec911 Author: zentol <chesnay@apache.org> Date: 2016-12-08T18:28:12Z FLINK-5298 TM checks that log file exists
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user EronWright commented on the issue:

          https://github.com/apache/flink/pull/2974

          @zentol what is the new behavior? Does the webui show the IOException?

          Show
          githubbot ASF GitHub Bot added a comment - Github user EronWright commented on the issue: https://github.com/apache/flink/pull/2974 @zentol what is the new behavior? Does the webui show the IOException?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user EronWright commented on the issue:

          https://github.com/apache/flink/pull/2974

          This doesn't really address the 'root cause' here, that the .out file is missing (for Mesos deployments). While we could change `mesos-taskmanager.sh` to redirect the output, I honestly hesitate to, because Mesos is already redirecting the output to 'stdout' and 'stderr'. It has log-rolling features too. Therefore I think it a step backwards to redirect to `flink-taskmanager.out`.

          So, I think Flink should treat the lack of a log as a 'not applicable' situation, not an 'error' situation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user EronWright commented on the issue: https://github.com/apache/flink/pull/2974 This doesn't really address the 'root cause' here, that the .out file is missing (for Mesos deployments). While we could change `mesos-taskmanager.sh` to redirect the output, I honestly hesitate to, because Mesos is already redirecting the output to 'stdout' and 'stderr'. It has log-rolling features too. Therefore I think it a step backwards to redirect to `flink-taskmanager.out`. So, I think Flink should treat the lack of a log as a 'not applicable' situation, not an 'error' situation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2974

          It should display "Fetching TaskManager log failed.", and log the exception. (see `TaskManagerLogHandler#respondAsLeader(): logPathFuture.exceptionally(...))

          This isn't a case that can only happen on Mesos. If the log was is deleted while the TM is running we have the exact same problem, except in this case it is in fact an error and should be displayed as such. Same if the logging is broken.

          I agree that we should display something different if we know that no log file should exist; how/whether we can find that out however i simply don't know. That's maybe something that you could weigh in.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2974 It should display "Fetching TaskManager log failed.", and log the exception. (see `TaskManagerLogHandler#respondAsLeader(): logPathFuture.exceptionally(...)) This isn't a case that can only happen on Mesos. If the log was is deleted while the TM is running we have the exact same problem, except in this case it is in fact an error and should be displayed as such. Same if the logging is broken. I agree that we should display something different if we know that no log file should exist; how/whether we can find that out however i simply don't know. That's maybe something that you could weigh in.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user EronWright commented on the issue:

          https://github.com/apache/flink/pull/2974

          @zentol I am fine with the 'error' behavior.

          Show
          githubbot ASF GitHub Bot added a comment - Github user EronWright commented on the issue: https://github.com/apache/flink/pull/2974 @zentol I am fine with the 'error' behavior.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/2974

          What is the state of this PR? It would be great to merge this for the next release candidate.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2974 What is the state of this PR? It would be great to merge this for the next release candidate.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2974

          @tillrohrmann well it still needs a review.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2974 @tillrohrmann well it still needs a review.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2974#discussion_r96843830

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala —
          @@ -830,19 +830,24 @@ class TaskManager(
          case StdOutFileRequest =>
          new File(logFilePath.substring(0, logFilePath.length - 4) + ".out");
          }

          • val fis = new FileInputStream(file);
          • Future { - val client: BlobClient = blobService.get.createClient() - client.put(fis); - }

            (context.dispatcher)

          • .onComplete {
          • case Success(value) =>
          • sender ! value
          • fis.close()
          • case Failure(e) =>
          • sender ! e
          • fis.close()
            + if (file.exists())
            Unknown macro: { + val fis = new FileInputStream(file); + Future { + val client: BlobClient = blobService.get.createClient() + client.put(fis); }(context.dispatcher) + .onComplete { + case Success(value) => + sender ! value + fis.close() + case Failure(e) => + sender ! e + fis.close() + }(context.dispatcher) + }

            else {
            + sender ! new IOException("TaskManager log files are unavailable. " +
            + "Log file does not exist.")

              • End diff –

          Maybe we could add the path under which we've looked for the log file to the error message.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r96843830 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala — @@ -830,19 +830,24 @@ class TaskManager( case StdOutFileRequest => new File(logFilePath.substring(0, logFilePath.length - 4) + ".out"); } val fis = new FileInputStream(file); Future { - val client: BlobClient = blobService.get.createClient() - client.put(fis); - } (context.dispatcher) .onComplete { case Success(value) => sender ! value fis.close() case Failure(e) => sender ! e fis.close() + if (file.exists()) Unknown macro: { + val fis = new FileInputStream(file); + Future { + val client: BlobClient = blobService.get.createClient() + client.put(fis); }(context.dispatcher) + .onComplete { + case Success(value) => + sender ! value + fis.close() + case Failure(e) => + sender ! e + fis.close() + }(context.dispatcher) + } else { + sender ! new IOException("TaskManager log files are unavailable. " + + "Log file does not exist.") End diff – Maybe we could add the path under which we've looked for the log file to the error message.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2974#discussion_r96843747

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala —
          @@ -830,19 +830,24 @@ class TaskManager(
          case StdOutFileRequest =>
          new File(logFilePath.substring(0, logFilePath.length - 4) + ".out");
          }

          • val fis = new FileInputStream(file);
          • Future { - val client: BlobClient = blobService.get.createClient() - client.put(fis); - }

            (context.dispatcher)

          • .onComplete {
          • case Success(value) =>
          • sender ! value
          • fis.close()
          • case Failure(e) =>
          • sender ! e
          • fis.close()
            + if (file.exists()) {
            + val fis = new FileInputStream(file);
            + Future { + val client: BlobClient = blobService.get.createClient() + client.put(fis); }

            (context.dispatcher)
            + .onComplete {
            + case Success(value) =>
            + sender ! value
            + fis.close()
            + case Failure(e) =>
            + sender ! e

              • End diff –

          And here `akka.actor.Status.Failure`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r96843747 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala — @@ -830,19 +830,24 @@ class TaskManager( case StdOutFileRequest => new File(logFilePath.substring(0, logFilePath.length - 4) + ".out"); } val fis = new FileInputStream(file); Future { - val client: BlobClient = blobService.get.createClient() - client.put(fis); - } (context.dispatcher) .onComplete { case Success(value) => sender ! value fis.close() case Failure(e) => sender ! e fis.close() + if (file.exists()) { + val fis = new FileInputStream(file); + Future { + val client: BlobClient = blobService.get.createClient() + client.put(fis); } (context.dispatcher) + .onComplete { + case Success(value) => + sender ! value + fis.close() + case Failure(e) => + sender ! e End diff – And here `akka.actor.Status.Failure`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2974#discussion_r96843719

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala —
          @@ -830,19 +830,24 @@ class TaskManager(
          case StdOutFileRequest =>
          new File(logFilePath.substring(0, logFilePath.length - 4) + ".out");
          }

          • val fis = new FileInputStream(file);
          • Future { - val client: BlobClient = blobService.get.createClient() - client.put(fis); - }

            (context.dispatcher)

          • .onComplete {
          • case Success(value) =>
          • sender ! value
          • fis.close()
          • case Failure(e) =>
          • sender ! e
          • fis.close()
            + if (file.exists())
            Unknown macro: { + val fis = new FileInputStream(file); + Future { + val client: BlobClient = blobService.get.createClient() + client.put(fis); }(context.dispatcher) + .onComplete { + case Success(value) => + sender ! value + fis.close() + case Failure(e) => + sender ! e + fis.close() + }(context.dispatcher) + }

            else {
            + sender ! new IOException("TaskManager log files are unavailable. " +

              • End diff –

          The same here with `akka.actor.Status.Failure`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r96843719 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala — @@ -830,19 +830,24 @@ class TaskManager( case StdOutFileRequest => new File(logFilePath.substring(0, logFilePath.length - 4) + ".out"); } val fis = new FileInputStream(file); Future { - val client: BlobClient = blobService.get.createClient() - client.put(fis); - } (context.dispatcher) .onComplete { case Success(value) => sender ! value fis.close() case Failure(e) => sender ! e fis.close() + if (file.exists()) Unknown macro: { + val fis = new FileInputStream(file); + Future { + val client: BlobClient = blobService.get.createClient() + client.put(fis); }(context.dispatcher) + .onComplete { + case Success(value) => + sender ! value + fis.close() + case Failure(e) => + sender ! e + fis.close() + }(context.dispatcher) + } else { + sender ! new IOException("TaskManager log files are unavailable. " + End diff – The same here with `akka.actor.Status.Failure`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2974#discussion_r96843696

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala —
          @@ -821,7 +821,7 @@ class TaskManager(
          val logFilePathOption = Option(config.configuration.getString(
          ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file")));
          logFilePathOption match {

          • case None => throw new IOException("TaskManager log files are unavailable. " +
            + case None => sender ! new IOException("TaskManager log files are unavailable. " +
              • End diff –

          In order to send an `Exception` over the wire to the sender it has to be packed into `akka.actor.Status.Failure`. Otherwise it will only be sent as a normal response type.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r96843696 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala — @@ -821,7 +821,7 @@ class TaskManager( val logFilePathOption = Option(config.configuration.getString( ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file"))); logFilePathOption match { case None => throw new IOException("TaskManager log files are unavailable. " + + case None => sender ! new IOException("TaskManager log files are unavailable. " + End diff – In order to send an `Exception` over the wire to the sender it has to be packed into `akka.actor.Status.Failure`. Otherwise it will only be sent as a normal response type.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2974

          @tillrohrmann I've addressed your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2974 @tillrohrmann I've addressed your comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2974#discussion_r97346693

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java —
          @@ -1074,6 +1080,79 @@ protected void run() {
          }};
          }

          + @Test
          + public void testLogNotFoundHandling() throws Exception {
          +
          + new JavaTestKit(system){{
          +
          + ActorGateway jobManager = null;
          + ActorGateway taskManager = null;
          +
          + final ActorGateway testActorGateway = new AkkaActorGateway(
          + getTestActor(),
          + leaderSessionID);
          +
          + try {
          + final IntermediateDataSetID resultId = new IntermediateDataSetID();
          +
          + // Create the JM
          + ActorRef jm = system.actorOf(Props.create(
          + new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor())));
          +
          + jobManager = new AkkaActorGateway(jm, leaderSessionID);
          +
          + final int dataPort = NetUtils.getAvailablePort();
          + Configuration config = new Configuration();
          + config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
          + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
          + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
          + config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
          +
          + taskManager = TestingUtils.createTaskManager(
          + system,
          + jobManager,
          + config,
          + false,
          + true);
          +
          + // ---------------------------------------------------------------------------------
          +
          + final ActorGateway tm = taskManager;
          + final ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(Executors.newSingleThreadExecutor());
          +
          + new Within(d) {
          + @Override
          + protected void run() {
          + Future<Object> logFuture = tm.ask(TaskManagerMessages.getRequestTaskManagerLog(), timeout);
          +
          + logFuture.onSuccess(new OnSuccess<Object>() {
          + @Override
          + public void onSuccess(Object result) throws Throwable

          { + Assert.fail(); + }

          + }, context);
          + logFuture.onFailure(new OnFailure() {
          + @Override
          + public void onFailure(Throwable failure) throws Throwable

          { + testActorGateway.tell(new Status.Success("success")); + }

          + }, context);
          — End diff –

          Maybe `Await.result` is a bit more succinct.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r97346693 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java — @@ -1074,6 +1080,79 @@ protected void run() { }}; } + @Test + public void testLogNotFoundHandling() throws Exception { + + new JavaTestKit(system){{ + + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + final ActorGateway testActorGateway = new AkkaActorGateway( + getTestActor(), + leaderSessionID); + + try { + final IntermediateDataSetID resultId = new IntermediateDataSetID(); + + // Create the JM + ActorRef jm = system.actorOf(Props.create( + new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor()))); + + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + final int dataPort = NetUtils.getAvailablePort(); + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); + config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist"); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + config, + false, + true); + + // --------------------------------------------------------------------------------- + + final ActorGateway tm = taskManager; + final ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(Executors.newSingleThreadExecutor()); + + new Within(d) { + @Override + protected void run() { + Future<Object> logFuture = tm.ask(TaskManagerMessages.getRequestTaskManagerLog(), timeout); + + logFuture.onSuccess(new OnSuccess<Object>() { + @Override + public void onSuccess(Object result) throws Throwable { + Assert.fail(); + } + }, context); + logFuture.onFailure(new OnFailure() { + @Override + public void onFailure(Throwable failure) throws Throwable { + testActorGateway.tell(new Status.Success("success")); + } + }, context); — End diff – Maybe `Await.result` is a bit more succinct.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2974#discussion_r97346342

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java —
          @@ -1074,6 +1080,79 @@ protected void run() {
          }};
          }

          + @Test
          + public void testLogNotFoundHandling() throws Exception {
          +
          + new JavaTestKit(system){{
          +
          + ActorGateway jobManager = null;
          + ActorGateway taskManager = null;
          +
          + final ActorGateway testActorGateway = new AkkaActorGateway(
          + getTestActor(),
          + leaderSessionID);
          +
          + try {
          + final IntermediateDataSetID resultId = new IntermediateDataSetID();
          +
          + // Create the JM
          + ActorRef jm = system.actorOf(Props.create(
          + new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor())));
          +
          + jobManager = new AkkaActorGateway(jm, leaderSessionID);
          +
          + final int dataPort = NetUtils.getAvailablePort();
          + Configuration config = new Configuration();
          + config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
          + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
          + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
          + config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
          +
          + taskManager = TestingUtils.createTaskManager(
          + system,
          + jobManager,
          — End diff –

          maybe `ActorRef.noSender()` is enough here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r97346342 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java — @@ -1074,6 +1080,79 @@ protected void run() { }}; } + @Test + public void testLogNotFoundHandling() throws Exception { + + new JavaTestKit(system){{ + + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + final ActorGateway testActorGateway = new AkkaActorGateway( + getTestActor(), + leaderSessionID); + + try { + final IntermediateDataSetID resultId = new IntermediateDataSetID(); + + // Create the JM + ActorRef jm = system.actorOf(Props.create( + new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor()))); + + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + final int dataPort = NetUtils.getAvailablePort(); + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); + config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist"); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, — End diff – maybe `ActorRef.noSender()` is enough here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2974#discussion_r97347534

          — Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java —
          @@ -0,0 +1,134 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.runtime.webmonitor.handlers;
          +
          +import io.netty.buffer.ByteBuf;
          +import io.netty.channel.ChannelHandlerContext;
          +import io.netty.handler.codec.http.DefaultFullHttpRequest;
          +import io.netty.handler.codec.http.HttpMethod;
          +import io.netty.handler.codec.http.HttpVersion;
          +import io.netty.handler.codec.http.router.Routed;
          +import org.apache.flink.api.common.time.Time;
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.runtime.akka.AkkaUtils;
          +import org.apache.flink.runtime.blob.BlobKey;
          +import org.apache.flink.runtime.clusterframework.types.ResourceID;
          +import org.apache.flink.runtime.concurrent.CompletableFuture;
          +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
          +import org.apache.flink.runtime.instance.ActorGateway;
          +import org.apache.flink.runtime.instance.Instance;
          +import org.apache.flink.runtime.instance.InstanceID;
          +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
          +import org.apache.flink.runtime.messages.JobManagerMessages;
          +import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
          +import org.junit.Assert;
          +import org.junit.Test;
          +import org.mockito.invocation.InvocationOnMock;
          +import org.mockito.stubbing.Answer;
          +import scala.Option;
          +import scala.collection.JavaConverters;
          +import scala.concurrent.ExecutionContext$;
          +import scala.concurrent.Future$;
          +import scala.concurrent.duration.FiniteDuration;
          +
          +import java.io.IOException;
          +import java.util.Collections;
          +import java.util.HashMap;
          +import java.util.Map;
          +import java.util.concurrent.Executor;
          +import java.util.concurrent.atomic.AtomicReference;
          +
          +import static org.mockito.Matchers.any;
          +import static org.mockito.Matchers.isA;
          +import static org.powermock.api.mockito.PowerMockito.mock;
          +import static org.powermock.api.mockito.PowerMockito.when;
          +
          +public class TaskManagerLogHandlerTest {
          + @Test
          + public void testLogFetchingFailure() throws Exception {
          + // ========= setup TaskManager =================================================================================
          + InstanceID tmID = new InstanceID();
          + ResourceID tmRID = new ResourceID(tmID.toString());
          + TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class);
          + when(taskManagerGateway.getAddress()).thenReturn("/tm/address");
          +
          + Instance taskManager = mock(Instance.class);
          + when(taskManager.getId()).thenReturn(tmID);
          + when(taskManager.getTaskManagerID()).thenReturn(tmRID);
          + when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
          + CompletableFuture<BlobKey> future = new FlinkCompletableFuture<>();
          + future.completeExceptionally(new IOException("failure"));
          + when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future);
          +
          + // ========= setup JobManager ==================================================================================
          +
          + ActorGateway jobManagerGateway = mock(ActorGateway.class);
          + Object registeredTaskManagersAnswer = new JobManagerMessages.RegisteredTaskManagers(
          + JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala());
          +
          + when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class), any(FiniteDuration.class)))
          + .thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer));
          + when(jobManagerGateway.ask(isA(JobManagerMessages.getRequestBlobManagerPort().getClass()), any(FiniteDuration.class)))
          + .thenReturn(Future$.MODULE$.successful((Object) 5));
          + when(jobManagerGateway.ask(isA(JobManagerMessages.RequestTaskManagerInstance.class), any(FiniteDuration.class)))
          + .thenReturn(Future$.MODULE$.successful((Object) new JobManagerMessages.TaskManagerInstance(Option.apply(taskManager))));
          + when(jobManagerGateway.path()).thenReturn("/jm/address");
          +
          + JobManagerRetriever retriever = mock(JobManagerRetriever.class);
          + when(retriever.getJobManagerGatewayAndWebPort())
          + .thenReturn(Option.apply(new scala.Tuple2<ActorGateway, Integer>(jobManagerGateway, 0)));
          +
          +
          + TaskManagerLogHandler handler = new TaskManagerLogHandler(
          + retriever,
          + ExecutionContext$.MODULE$.fromExecutor(new CurrentThreadExecutor()),
          + Future$.MODULE$.successful("/jm/address"),
          + AkkaUtils.getDefaultClientTimeout(),
          + TaskManagerLogHandler.FileMode.LOG,
          + new Configuration(),
          + false);
          +
          + final AtomicReference<String> exception = new AtomicReference<>();
          +
          + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
          + when(ctx.write(isA(ByteBuf.class))).thenAnswer(new Answer<Object>() {
          + @Override
          + public Object answer(InvocationOnMock invocationOnMock) throws Throwable

          { + ByteBuf data = invocationOnMock.getArgumentAt(0, ByteBuf.class); + exception.set(new String(data.array())); + return null; + }

          + });
          +
          + Map<String, String> pathParams = new HashMap<>();
          + pathParams.put(TaskManagersHandler.TASK_MANAGER_ID_KEY, tmID.toString());
          + Routed routed = mock(Routed.class);
          + when(routed.pathParams()).thenReturn(pathParams);
          + when(routed.request()).thenReturn(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/taskmanagers/" + tmID + "/log"));
          +
          + handler.respondAsLeader(ctx, routed, jobManagerGateway);
          +
          + Assert.assertEquals("Fetching TaskManager log failed.", exception.get());
          + }
          +
          + public class CurrentThreadExecutor implements Executor {
          + public void execute(Runnable r)

          { + r.run(); + }

          + }
          — End diff –

          `Executors.directExecutor` could help.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r97347534 — Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java — @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.handlers; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.router.Routed; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.CompletableFuture; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import scala.Option; +import scala.collection.JavaConverters; +import scala.concurrent.ExecutionContext$; +import scala.concurrent.Future$; +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +public class TaskManagerLogHandlerTest { + @Test + public void testLogFetchingFailure() throws Exception { + // ========= setup TaskManager ================================================================================= + InstanceID tmID = new InstanceID(); + ResourceID tmRID = new ResourceID(tmID.toString()); + TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class); + when(taskManagerGateway.getAddress()).thenReturn("/tm/address"); + + Instance taskManager = mock(Instance.class); + when(taskManager.getId()).thenReturn(tmID); + when(taskManager.getTaskManagerID()).thenReturn(tmRID); + when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway); + CompletableFuture<BlobKey> future = new FlinkCompletableFuture<>(); + future.completeExceptionally(new IOException("failure")); + when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future); + + // ========= setup JobManager ================================================================================== + + ActorGateway jobManagerGateway = mock(ActorGateway.class); + Object registeredTaskManagersAnswer = new JobManagerMessages.RegisteredTaskManagers( + JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala()); + + when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class), any(FiniteDuration.class))) + .thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer)); + when(jobManagerGateway.ask(isA(JobManagerMessages.getRequestBlobManagerPort().getClass()), any(FiniteDuration.class))) + .thenReturn(Future$.MODULE$.successful((Object) 5)); + when(jobManagerGateway.ask(isA(JobManagerMessages.RequestTaskManagerInstance.class), any(FiniteDuration.class))) + .thenReturn(Future$.MODULE$.successful((Object) new JobManagerMessages.TaskManagerInstance(Option.apply(taskManager)))); + when(jobManagerGateway.path()).thenReturn("/jm/address"); + + JobManagerRetriever retriever = mock(JobManagerRetriever.class); + when(retriever.getJobManagerGatewayAndWebPort()) + .thenReturn(Option.apply(new scala.Tuple2<ActorGateway, Integer>(jobManagerGateway, 0))); + + + TaskManagerLogHandler handler = new TaskManagerLogHandler( + retriever, + ExecutionContext$.MODULE$.fromExecutor(new CurrentThreadExecutor()), + Future$.MODULE$.successful("/jm/address"), + AkkaUtils.getDefaultClientTimeout(), + TaskManagerLogHandler.FileMode.LOG, + new Configuration(), + false); + + final AtomicReference<String> exception = new AtomicReference<>(); + + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + when(ctx.write(isA(ByteBuf.class))).thenAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + ByteBuf data = invocationOnMock.getArgumentAt(0, ByteBuf.class); + exception.set(new String(data.array())); + return null; + } + }); + + Map<String, String> pathParams = new HashMap<>(); + pathParams.put(TaskManagersHandler.TASK_MANAGER_ID_KEY, tmID.toString()); + Routed routed = mock(Routed.class); + when(routed.pathParams()).thenReturn(pathParams); + when(routed.request()).thenReturn(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/taskmanagers/" + tmID + "/log")); + + handler.respondAsLeader(ctx, routed, jobManagerGateway); + + Assert.assertEquals("Fetching TaskManager log failed.", exception.get()); + } + + public class CurrentThreadExecutor implements Executor { + public void execute(Runnable r) { + r.run(); + } + } — End diff – `Executors.directExecutor` could help.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2974#discussion_r97346835

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java —
          @@ -1074,6 +1080,79 @@ protected void run() {
          }};
          }

          + @Test
          + public void testLogNotFoundHandling() throws Exception {
          +
          + new JavaTestKit(system){{
          +
          + ActorGateway jobManager = null;
          + ActorGateway taskManager = null;
          +
          + final ActorGateway testActorGateway = new AkkaActorGateway(
          + getTestActor(),
          + leaderSessionID);
          +
          + try {
          + final IntermediateDataSetID resultId = new IntermediateDataSetID();
          +
          + // Create the JM
          + ActorRef jm = system.actorOf(Props.create(
          + new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor())));
          +
          + jobManager = new AkkaActorGateway(jm, leaderSessionID);
          +
          + final int dataPort = NetUtils.getAvailablePort();
          + Configuration config = new Configuration();
          + config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
          + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
          + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
          + config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
          +
          + taskManager = TestingUtils.createTaskManager(
          + system,
          + jobManager,
          + config,
          + false,
          + true);
          +
          + // ---------------------------------------------------------------------------------
          +
          + final ActorGateway tm = taskManager;
          + final ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(Executors.newSingleThreadExecutor());
          +
          + new Within(d) {
          + @Override
          + protected void run() {
          + Future<Object> logFuture = tm.ask(TaskManagerMessages.getRequestTaskManagerLog(), timeout);
          +
          + logFuture.onSuccess(new OnSuccess<Object>() {
          + @Override
          + public void onSuccess(Object result) throws Throwable

          { + Assert.fail(); + }

          + }, context);
          + logFuture.onFailure(new OnFailure() {
          + @Override
          + public void onFailure(Throwable failure) throws Throwable

          { + testActorGateway.tell(new Status.Success("success")); + }

          + }, context);
          +
          + Status.Success msg = expectMsgClass(Status.Success.class);
          + Assert.assertEquals("success", msg.status());
          + }
          + };
          + }
          + catch(Exception e) {
          + e.printStackTrace();
          + fail(e.getMessage());
          — End diff –

          Better let the exception bubble up. Less code

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r97346835 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java — @@ -1074,6 +1080,79 @@ protected void run() { }}; } + @Test + public void testLogNotFoundHandling() throws Exception { + + new JavaTestKit(system){{ + + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + final ActorGateway testActorGateway = new AkkaActorGateway( + getTestActor(), + leaderSessionID); + + try { + final IntermediateDataSetID resultId = new IntermediateDataSetID(); + + // Create the JM + ActorRef jm = system.actorOf(Props.create( + new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor()))); + + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + final int dataPort = NetUtils.getAvailablePort(); + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); + config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist"); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, + config, + false, + true); + + // --------------------------------------------------------------------------------- + + final ActorGateway tm = taskManager; + final ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(Executors.newSingleThreadExecutor()); + + new Within(d) { + @Override + protected void run() { + Future<Object> logFuture = tm.ask(TaskManagerMessages.getRequestTaskManagerLog(), timeout); + + logFuture.onSuccess(new OnSuccess<Object>() { + @Override + public void onSuccess(Object result) throws Throwable { + Assert.fail(); + } + }, context); + logFuture.onFailure(new OnFailure() { + @Override + public void onFailure(Throwable failure) throws Throwable { + testActorGateway.tell(new Status.Success("success")); + } + }, context); + + Status.Success msg = expectMsgClass(Status.Success.class); + Assert.assertEquals("success", msg.status()); + } + }; + } + catch(Exception e) { + e.printStackTrace(); + fail(e.getMessage()); — End diff – Better let the exception bubble up. Less code
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2974

          @tillrohrmann I've addressed your comments. Will merge this once travis passes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2974 @tillrohrmann I've addressed your comments. Will merge this once travis passes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2974#discussion_r97354396

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java —
          @@ -1074,6 +1080,79 @@ protected void run() {
          }};
          }

          + @Test
          + public void testLogNotFoundHandling() throws Exception {
          +
          + new JavaTestKit(system){{
          +
          + ActorGateway jobManager = null;
          + ActorGateway taskManager = null;
          +
          + final ActorGateway testActorGateway = new AkkaActorGateway(
          + getTestActor(),
          + leaderSessionID);
          +
          + try {
          + final IntermediateDataSetID resultId = new IntermediateDataSetID();
          +
          + // Create the JM
          + ActorRef jm = system.actorOf(Props.create(
          + new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor())));
          +
          + jobManager = new AkkaActorGateway(jm, leaderSessionID);
          +
          + final int dataPort = NetUtils.getAvailablePort();
          + Configuration config = new Configuration();
          + config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
          + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
          + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
          + config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
          +
          + taskManager = TestingUtils.createTaskManager(
          + system,
          + jobManager,
          — End diff –

          Without a JobManager no BlobService is started. This means the TM would fail earlier then we want him to.

          While trying it out i found another exception that should be wrapped though :>

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2974#discussion_r97354396 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java — @@ -1074,6 +1080,79 @@ protected void run() { }}; } + @Test + public void testLogNotFoundHandling() throws Exception { + + new JavaTestKit(system){{ + + ActorGateway jobManager = null; + ActorGateway taskManager = null; + + final ActorGateway testActorGateway = new AkkaActorGateway( + getTestActor(), + leaderSessionID); + + try { + final IntermediateDataSetID resultId = new IntermediateDataSetID(); + + // Create the JM + ActorRef jm = system.actorOf(Props.create( + new SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor()))); + + jobManager = new AkkaActorGateway(jm, leaderSessionID); + + final int dataPort = NetUtils.getAvailablePort(); + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100); + config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200); + config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist"); + + taskManager = TestingUtils.createTaskManager( + system, + jobManager, — End diff – Without a JobManager no BlobService is started. This means the TM would fail earlier then we want him to. While trying it out i found another exception that should be wrapped though :>
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol closed the pull request at:

          https://github.com/apache/flink/pull/2974

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol closed the pull request at: https://github.com/apache/flink/pull/2974
          Hide
          Zentol Chesnay Schepler added a comment -

          master: b8fa8c65638e8749f9ad994da8ec69a6c34df029
          1.2: f791024eda9ae9d9e3a073754dcb71684d98cb04

          Show
          Zentol Chesnay Schepler added a comment - master: b8fa8c65638e8749f9ad994da8ec69a6c34df029 1.2: f791024eda9ae9d9e3a073754dcb71684d98cb04

            People

            • Assignee:
              Zentol Chesnay Schepler
              Reporter:
              Makman2 Mischa Krüger
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development