Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5773

Cannot cast scala.util.Failure to org.apache.flink.runtime.messages.Acknowledge

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0, 1.3.0
    • Fix Version/s: 1.3.0, 1.2.1
    • Labels:
      None

      Description

      The exception below happens when I set the StreamExecutionEnvironment.setMaxParallelism() to anything less than 4.

      Let me know if you need more information.

      Caused by: java.lang.ClassCastException: Cannot cast scala.util.Failure to org.apache.flink.runtime.messages.Acknowledge
      	at java.lang.Class.cast(Class.java:3369)
      	at scala.concurrent.Future$$anonfun$mapTo$1.apply(Future.scala:405)
      	at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
      	at scala.util.Try$.apply(Try.scala:161)
      	at scala.util.Success.map(Try.scala:206)
      	at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
      	at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
      	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
      	at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.processBatch$1(Future.scala:643)
      	at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply$mcV$sp(Future.scala:658)
      	at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635)
      	at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635)
      	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
      	at scala.concurrent.Future$InternalCallbackExecutor$Batch.run(Future.scala:634)
      	at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
      	at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:685)
      	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
      	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
      	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
      	at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.scala:1206)
      	at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:458)
      	at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:280)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
      	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
      	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
      	at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
      	... 5 more
      

        Issue Links

          Activity

          Hide
          aljoscha Aljoscha Krettek added a comment -

          The exception you're seeing is actually masking another exception. Somewhere down there we get this:

          java.lang.IllegalArgumentException: Max parallelism must be >= than parallelism.
          

          I guess your parallelism is 1 on that local cluster, that's the reason for that exception.

          This is definitely a bug, thanks for reporting!

          Show
          aljoscha Aljoscha Krettek added a comment - The exception you're seeing is actually masking another exception. Somewhere down there we get this: java.lang.IllegalArgumentException: Max parallelism must be >= than parallelism. I guess your parallelism is 1 on that local cluster, that's the reason for that exception. This is definitely a bug, thanks for reporting!
          Hide
          sunjincheng121 sunjincheng added a comment - - edited

          HI, Colin Breame, We can look at `setMaxParallelism` repeated scala'doc.

           Sets the maximum degree of parallelism defined for the program.
           The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
           defines the number of key groups used for partitioned state.
          

          This set value `setMaxParallelism(valueA)` is the `setParallelism(valueB)` associated which requires (valueA >= valueB). The concurrency of your program In your local default parallelism may be 4, so request valueA>= 4, you can try to set `env.setParallelism (1) `then you can `setMaxParallelism` any number greater than 0, can you try it?

          Show
          sunjincheng121 sunjincheng added a comment - - edited HI, Colin Breame , We can look at `setMaxParallelism` repeated scala'doc. Sets the maximum degree of parallelism defined for the program. The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key groups used for partitioned state. This set value `setMaxParallelism(valueA)` is the `setParallelism(valueB)` associated which requires (valueA >= valueB). The concurrency of your program In your local default parallelism may be 4, so request valueA>= 4, you can try to set `env.setParallelism (1) `then you can `setMaxParallelism` any number greater than 0, can you try it?
          Hide
          till.rohrmann Till Rohrmann added a comment -

          I think the problem is that we use the wrong Failure class to signal an exception in the submitTask call. When using Akka's AskSupport trait, the ! operator expects a akka.actor.Status.Failure message to indicate a failure. But we're giving it a scala.util.Failure instance. Internally the akka.actor.Status.Failure will be unwrapped and wrapped in a scala.util.Failure which is then send to the receiver. Alternatively, we could also use the ActorRef.tell method which does not do such a re-wrapping magic.

          Show
          till.rohrmann Till Rohrmann added a comment - I think the problem is that we use the wrong Failure class to signal an exception in the submitTask call. When using Akka's AskSupport trait, the ! operator expects a akka.actor.Status.Failure message to indicate a failure. But we're giving it a scala.util.Failure instance. Internally the akka.actor.Status.Failure will be unwrapped and wrapped in a scala.util.Failure which is then send to the receiver. Alternatively, we could also use the ActorRef.tell method which does not do such a re-wrapping magic.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/3321

          FLINK-5773 Use akka.actor.Status.Failure class to send failures via AskSupport

          Akka's AskSupport trait requires that failures are wrapped in a akka.actor.Status.Failure
          to be recognized. Internally the trait will unwrap the failure and wrap it in a
          scala.util.Failure instance. However, it does not recognize the scala Failure when given
          to the AskSupport trait. As a consequence it would wrap scala.util.Failure in a
          scala.util.Success instance.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink FLINK-5773

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3321.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3321


          commit e7e0431503f53fe9b8f36971a18d07ab0d16faf4
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-02-15T13:16:26Z

          FLINK-5773 Use akka.actor.Status.Failure class to send failures via AskSupport

          Akka's AskSupport trait requires that failures are wrapped in a akka.actor.Status.Failure
          to be recognized. Internally the trait will unwrap the failure and wrap it in a
          scala.util.Failure instance. However, it does not recognize the scala Failure when given
          to the AskSupport trait. As a consequence it would wrap scala.util.Failure in a
          scala.util.Success instance.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3321 FLINK-5773 Use akka.actor.Status.Failure class to send failures via AskSupport Akka's AskSupport trait requires that failures are wrapped in a akka.actor.Status.Failure to be recognized. Internally the trait will unwrap the failure and wrap it in a scala.util.Failure instance. However, it does not recognize the scala Failure when given to the AskSupport trait. As a consequence it would wrap scala.util.Failure in a scala.util.Success instance. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink FLINK-5773 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3321.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3321 commit e7e0431503f53fe9b8f36971a18d07ab0d16faf4 Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-02-15T13:16:26Z FLINK-5773 Use akka.actor.Status.Failure class to send failures via AskSupport Akka's AskSupport trait requires that failures are wrapped in a akka.actor.Status.Failure to be recognized. Internally the trait will unwrap the failure and wrap it in a scala.util.Failure instance. However, it does not recognize the scala Failure when given to the AskSupport trait. As a consequence it would wrap scala.util.Failure in a scala.util.Success instance.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3321

          Good fix, +1 to merge this

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3321 Good fix, +1 to merge this
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/3324

          [backport] FLINK-5773 Use akka.actor.Status.Failure class to send failures via AskSupport

          Backport of #3321 onto the `release-1.2` branch.

          Akka's AskSupport trait requires that failures are wrapped in a akka.actor.Status.Failure
          to be recognized. Internally the trait will unwrap the failure and wrap it in a
          scala.util.Failure instance. However, it does not recognize the scala Failure when given
          to the AskSupport trait. As a consequence it would wrap scala.util.Failure in a
          scala.util.Success instance.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink FLINK-5773backport

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3324.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3324


          commit 322c5ada58f407939a375c42c6f4e134f3527cde
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-02-15T13:16:26Z

          FLINK-5773 Use akka.actor.Status.Failure class to send failures via AskSupport

          Akka's AskSupport trait requires that failures are wrapped in a akka.actor.Status.Failure
          to be recognized. Internally the trait will unwrap the failure and wrap it in a
          scala.util.Failure instance. However, it does not recognize the scala Failure when given
          to the AskSupport trait. As a consequence it would wrap scala.util.Failure in a
          scala.util.Success instance.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3324 [backport] FLINK-5773 Use akka.actor.Status.Failure class to send failures via AskSupport Backport of #3321 onto the `release-1.2` branch. Akka's AskSupport trait requires that failures are wrapped in a akka.actor.Status.Failure to be recognized. Internally the trait will unwrap the failure and wrap it in a scala.util.Failure instance. However, it does not recognize the scala Failure when given to the AskSupport trait. As a consequence it would wrap scala.util.Failure in a scala.util.Success instance. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink FLINK-5773 backport Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3324.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3324 commit 322c5ada58f407939a375c42c6f4e134f3527cde Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-02-15T13:16:26Z FLINK-5773 Use akka.actor.Status.Failure class to send failures via AskSupport Akka's AskSupport trait requires that failures are wrapped in a akka.actor.Status.Failure to be recognized. Internally the trait will unwrap the failure and wrap it in a scala.util.Failure instance. However, it does not recognize the scala Failure when given to the AskSupport trait. As a consequence it would wrap scala.util.Failure in a scala.util.Success instance.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          I opened another issue for the parallelism/max-parallelism issue that was masked by this issue: FLINK-5808

          Show
          aljoscha Aljoscha Krettek added a comment - I opened another issue for the parallelism/max-parallelism issue that was masked by this issue: FLINK-5808
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3324

          +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3324 +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3321

          Thanks for the review @StephanEwen. Tests have passed on Travis. Merging this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3321 Thanks for the review @StephanEwen. Tests have passed on Travis. Merging this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3324

          Thanks for your review @StephanEwen. Travis has passed. Merging this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3324 Thanks for your review @StephanEwen. Travis has passed. Merging this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3321

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3321
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann closed the pull request at:

          https://github.com/apache/flink/pull/3324

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/3324
          Hide
          till.rohrmann Till Rohrmann added a comment -

          1.3.0: 413609d13fcf924fa8581450618bccc6abdbbda0
          1.2.1: a2853ec1527dd848d635d9cb7720b2bd21c7e3aa

          Show
          till.rohrmann Till Rohrmann added a comment - 1.3.0: 413609d13fcf924fa8581450618bccc6abdbbda0 1.2.1: a2853ec1527dd848d635d9cb7720b2bd21c7e3aa

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              colinbreame Colin Breame
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development