Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Abandoned
-
1.5.3, 1.6.0
-
None
-
None
-
4tm and 1jm for now on 1.6.0
Description
Hi!
I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), and deployed a small job on it. Because of the current load, job is completely handled by a single tm. I've created a small proxy that is using QueryableStateClient to access the current state. It is working nicely, except under certain circumstances. It seems to me that I can only access the state through a node that is holding a part of the job. Here's an example:
- job on tm1. Pointing QueryableStateClient to tm1. State accessible
- job still on tm1. Pointing QueryableStateClient to tm2 (for example). State inaccessible
- killing tm1, job is now on tm2. State accessible
- job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible
- adding some parallelism to spread job on tm1 and tm2. Pointing QueryableStateClient to either tm1 and tm2 is working
- job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State inaccessible
When the state is inaccessible, I can see this (generated here):
java.lang.RuntimeException: Failed request 0. Caused by: org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could not retrieve location of state=repo-status of job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is not ready, or ii) the job does not exist. at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Went a bit through the (master branch) code. Class KvStateClientProxy is holding kvStateLocationOracle the key-value state location oracle for the given JobID. Here's the usage:
- updateKvStateLocationOracle() in registerQueryableState() (TaskExecutor.java)
- registerQueryableState() in associateWithJobManager() (TaskExecutor.java)
- associateWithJobManager in establishJobManagerConnection (TaskExecutor.java)
- establishJobManagerConnection in jobManagerGainedLeadership (TaskExecutor.java)
- jobManagerGainedLeadership in onRegistrationSuccess (JobLeaderService.java)
It seems that the KvStateLocationOracle map is updated only when the task manager is part of the job.
For now, we are creating a List<CompletableFuture<...>> and getting the first CompletableFuture.succeeded future, but that is a workaround.