Details
-
New Feature
-
Status: Reopened
-
Major
-
Resolution: Unresolved
-
2.4.0, 3.0.0
-
None
-
None
Description
Spark's BlacklistTracker maintains a list of "bad nodes" which it will not use for running tasks (eg., because of bad hardware). When running in yarn, this blacklist is used to avoid ever allocating resources on blacklisted nodes: https://github.com/apache/spark/blob/e836c27ce011ca9aef822bef6320b4a7059ec343/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala#L128
I'm just beginning to poke around the kubernetes code, so apologies if this is incorrect – but I didn't see any references to scheduler.nodeBlacklist() in KubernetesClusterSchedulerBackend so it seems this is missing. Thought of this while looking at SPARK-19755, a similar issue on mesos.
Attachments
Issue Links
- is related to
-
SPARK-19755 Blacklist is always active for MesosCoarseGrainedSchedulerBackend. As result - scheduler cannot create an executor after some time.
- Resolved
-
SPARK-16630 Blacklist a node if executors won't launch on it.
- Resolved
Activity
I'm not sure if node blacklisting applies to Kubernetes. In the Kubernetes mode, executors run in containers that in turn run in Kubernetes pods scheduled to run on available cluster nodes by the Kubernetes scheduler. The Kubernetes Spark scheduler backend does not keep track of nor really care about which nodes the pods run on. This is a concern of the Kubernetes scheduler.
When an executor fails all cases are covered via handleDisconnectedExecutors which is scheduled at some rate and then it calls removeExecutor in CoarseGrainedSchedulerBackend which updates blacklist info. When we want to launch new executors, CoarseGrainedSchedulerBackend will terminate an executor that is already started on a blacklisted node. IMHO kubernetes spark scheduler should filter nodes and constraint where pods are launched on, as it knows already that some nodes are no option. For example this could be done with this feature, also relates to (https://github.com/kubernetes/kubernetes/issues/14573), in cases like where node problems appear.
The Kubernetes scheduler backend simply creates executor pods through the Kubernetes API server, and the pods are scheduled by the Kubernetes scheduler to run on the available nodes. The scheduler backend is not interested nor it should know about the mapping from pods to nodes. Affinity and anti-affinity, or taint and toleration can be used to influence pod scheduling. But it's the Kubernetes scheduler and Kubelets' responsibilities to keep track of node problems and avoid scheduling pods onto problematic nodes.
Does the scheduler know all the reasons an app might want to define a node as problematic and (blacklisted in our case)? Is there a pointer for the node problems ?
It seems to me that this is the reason taints are introduced (might be wrong).
Could kubelet detect the scenario for which blacklist was introduced anyway:
https://blog.cloudera.com/blog/2017/04/blacklisting-in-apache-spark/?
Yeah I don't think its safe to assume that its kubernetes responsibility to entirely figure out the equivalent of a spark application's internal blacklist. You can't guarantee that it'll detect hardware issues, and it also might be an issue which is specific to the spark application (eg. a missing jar).
Yarn has some basic detection of bad nodes as well, but we observed cases in production where one bad disk would effectively take out an entire application on a large cluster without spark's blacklisting, as you could have many task failures pile up very quickly.
That said, the existing blacklist implementation in spark already handles that case, even without the extra handling I'm proposing here. The spark app would still have its own node blacklist, and would avoid scheduling tasks on that node.
However, this is suboptimal because spark isn't really getting as many resources as it should. Eg., it would request 10 executors, kubernetes hands it 10, but really spark can only use 8 of them because 2 live on a node that is blacklisted.
I don't think this can be directly handled with taints, if I understand correctly. I assume applying a taint is an admin level thing? that would mean a spark app couldn't dynamically apply a taint when it discovers a problem on a node (and really, it probably shouldn't be able to, as it shouldn't trust an arbitrary user). Furthermore, it doesn't allow it to be application specific – blacklisting is really just a heuristic, and you probably do not want it to be applied across applications. Its not clear what you'd do with multiple apps each with their own blacklist, as nodes go into the blacklist and then move out of the blacklist at different times from each app.
In the Yarn case, yes, it's possible that a node is missing a jar commonly needed by applications. In the Kubernetes mode, this will never be the case because containers either all have a particular jar locally or none of them has it. An image missing a dependency is problematic by itself. This consistency is one of the benefit of being containerized. Talking about node problems, detecting node problems and avoid scheduling pods onto problematic nodes are the concerns of the kubelets and the scheduler. Applications should not need to worry about if nodes are healthy or not. Node problems happening at runtime cause pods to be evicted from the problematic nodes and rescheduled somewhere else. Having applications be responsible for keeping track of problematic nodes and maintain a blacklist means unnecessarily jumping into the business of kubelets and the scheduler.
While mostly I think that K8s would be better suited to make the decision to blacklist nodes, I think we will see that there are causes to consider nodes problematic beyond just the kubelet health checks, so, using Spark's blacklisting sounds like a good idea to me.
Tainting nodes isn't the right solution given it's one Spark application's notion of a blacklist and we don't want it to be applied at a cluster level. We could however, use node anti-affinity to communicate said blacklist and ensure that certain nodes are avoided by executors of that application.
ok the missing jar was a bad example on kubernetes ... I still wouldn't be surprised if there is some app-specific failure mode we're failing to take into account.
I think you are too confident in kubernetes ability to detect problems with nodes – I don't know what it does but I don't think it is possible for it handle this. It would be great if we really could rely on the separation of concerns you want; in practice that just doesn't work because the app has more info.
It almost sounds like you think Spark should not even use any internal blacklisting with kubernetes – from experience with large non-kubernetes deployments, I think that is a bad idea.
It's not that I'm too confident on the capability of Kubernetes to detect node problems. I just don't see it as a good practice of worrying about node problems at application level in a containerized environment running on a container orchestration system. For that reason, yes, I don't think Spark on Kubernetes should really need to worry about blacklisting nodes.
How about locality preferences + a hardware problem, like the disk problem? I see code in Spark Kubernetes scheduler related to locality (not sure if it is completed). Will that problem be detected and will kubernetes scheduler consider the node as problematic? If so then I guess there is no need for blacklisting in such scenarios. If though, this cannot be detected and the task is failing but there is locality preference what will happen? Kubernetes should not just re-run things elsewhere just because there was a failure. The reason for a failure matters. Is that an app failure or something lower level. (I am new to kubernetes, so I missed the fact that taints is applied cluster wide, we just need some similar feature as already mentioned node anti-affinity).
Stavros - we do currently differentiate between kubernetes causing an executor to disappear (node failure) and exit caused by the application itself.
Here's some detail on node issues and k8s:
The node level problem detection is split between the Kubelet and the Node Problem Detector. This works for some common errors and in future, will taint nodes upon detecting them. Some of these errors are listed here. However, there are some categories of errors this setup won't detect. For example: if we have a node that has firewall rules/networking that prevents an executor running on it accessing a particular external service, to say - download/stream data. Or, a node with issues in its local disk which makes a spark executor on it throw read/write errors. These error conditions may only affect certain kinds of pods on that node and not others.
Yinan's point I think is that it is uncommon for applications on k8s to try and incorporate reasoning about node level conditions. I think this is because the general expectation is that a failure on a given node will just cause new executors to spin up on different nodes and eventually the application will succeed. However, I can see this being an issue in large-scale production deployments, where we'd see transient errors like above. Given the existence of a blacklist mechanism and anti-affinity primitives, it wouldn't be too complex to incorporate it I think.
I think this is because the general expectation is that a failure on a given node will just cause new executors to spin up on different nodes and eventually the application will succeed.
I think this is the part which may be particularly different in spark. Some types of failures do not cause the executor to die – its just a task failure, and the executor itself is still alive. As long as Spark gets heartbeats from the executor, it figures its still fine. But a bad disk can cause tasks to repeatedly fail. Could be true for other resources, eg. a bad gpu, and maybe the gpu is only used by certain tasks.
When that happens, without spark's internal blacklisting, an application will very quickly hit many task failures. The task fails, spark notices that, tries to find a place to assign the failed task, puts it back in the same place; repeat till spark decides there are too many failures and gives up. It can easily cause your app to fail in ~1 second. There is no communication with the cluster manager through this process, its all just between the spark's driver & executor. In one case, when this happened yarn's own health checker discovered the problem a few mins after it occurred – but the spark app had already failed by that point. From one bad disk in a cluster w/ > 1000 disks.
Spark's blacklisting is really meant to be complementary to the type of node health checks you are talking about in kubernetes. The blacklisting in spark intentionally does not try to figure out the root cause of the problem, as we don't want to get into the game of enumerating all of the possibilities. Its a heuristic which makes it safe for spark to keep going in case of these un-caught errors, but then retries the resources when it would be safe to do so. (discussed in more detail in the design doc on SPARK-8425.)
anti-affinity in kubernetes may be just the trick, though this part of the doc was a little worrisome:
Note: Inter-pod affinity and anti-affinity require substantial amount of processing which can slow down scheduling in large clusters significantly. We do not recommend using them in clusters larger than several hundred nodes.
Blacklisting is most important in large clusters. It seems like its able to do something much more complicated than a simple node blacklist, though – maybe it would already be faster with such a simple anti-affinity rule?
I think this issue is still valid for current versions, it was just opened against 2.3.0, so reopening.
Hi, what's the current status of this issue? Does the kubernetes mode support node blacklists in lastest version (3.x)?
Also related to
SPARK-16630... if that is solved before this for other cluster managers, then we should probably roll similar behavior into this for kubernetes too