Details
-
Bug
-
Status: Resolved
-
P1
-
Resolution: Workaround
-
2.8.0, 2.9.0
-
Tested on Ubuntu 18
Beam 2.8
Tested with Flink:
1) [local]
2) Cluster inside a K8S cluster on minikube
3) Cluster inside a K8S cluster on GCP
Tested using Cassandra [cqlsh 5.0.1 | Cassandra 3.11.3 | CQL spec 3.4.4 | Native protocol v4]:
1) In a local container
2) Cluster inside a K8S cluster on minikube
3) Cluster inside a K8S cluster on GCP
Tested on Ubuntu 18 Beam 2.8 Tested with Flink: 1) [local] 2) Cluster inside a K8S cluster on minikube 3) Cluster inside a K8S cluster on GCP Tested using Cassandra [cqlsh 5.0.1 | Cassandra 3.11.3 | CQL spec 3.4.4 | Native protocol v4]: 1) In a local container 2) Cluster inside a K8S cluster on minikube 3) Cluster inside a K8S cluster on GCP
Description
Can't make a simple join on two Cassandra tables when using FlinkRunner.
The same code works with a DirectRunner fails when used with FlinkRunner giving these (as well as many other) errors:
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatchere1f5abe7-6299-43ea-9182-24a2193e078f#-1757043920]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) at java.lang.Thread.run(Thread.java:748)
The code can be found here
Steps to reproduce:
- Clone the repo to a linux (I;m on Ubuntu 18 but any *nix system would probably work - i.e. repl.it)
- Follow the README to set up a Cassandra container + schema
- Run with
gradle --console=plain join-from-cassandra -Drunner=flink > output/build.log 2>&1
to use FlinkRunner. See error in log at ./output/build.log
- Run with
gradle --console=plain join-from-cassandra -Drunner=direct > output/build.log 2>&1
to use DirectRunner. See error in log at ./output/build.log