Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23485

Kubernetes should support node blacklist

Details

    • New Feature
    • Status: Reopened
    • Major
    • Resolution: Unresolved
    • 2.4.0, 3.0.0
    • None
    • None

    Description

      Spark's BlacklistTracker maintains a list of "bad nodes" which it will not use for running tasks (eg., because of bad hardware). When running in yarn, this blacklist is used to avoid ever allocating resources on blacklisted nodes: https://github.com/apache/spark/blob/e836c27ce011ca9aef822bef6320b4a7059ec343/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala#L128

      I'm just beginning to poke around the kubernetes code, so apologies if this is incorrect – but I didn't see any references to scheduler.nodeBlacklist() in KubernetesClusterSchedulerBackend so it seems this is missing. Thought of this while looking at SPARK-19755, a similar issue on mesos.

      Attachments

        Issue Links

          Activity

            irashid Imran Rashid added a comment -

            Also related to SPARK-16630 ... if that is solved before this for other cluster managers, then we should probably roll similar behavior into this for kubernetes too

            irashid Imran Rashid added a comment - Also related to SPARK-16630 ... if that is solved before this for other cluster managers, then we should probably roll similar behavior into this for kubernetes too
            liyinan926 Yinan Li added a comment -

            I'm not sure if node blacklisting applies to Kubernetes. In the Kubernetes mode, executors run in containers that in turn run in Kubernetes pods scheduled to run on available cluster nodes by the Kubernetes scheduler. The Kubernetes Spark scheduler backend does not keep track of nor really care about which nodes the pods run on. This is a concern of the Kubernetes scheduler.

            liyinan926 Yinan Li added a comment - I'm not sure if node blacklisting applies to Kubernetes. In the Kubernetes mode, executors run in containers that in turn run in Kubernetes pods scheduled to run on available cluster nodes by the Kubernetes scheduler. The Kubernetes Spark scheduler backend does not keep track of nor really care about which nodes the pods run on. This is a concern of the Kubernetes scheduler.
            skonto Stavros Kontopoulos added a comment - - edited

            When an executor fails all cases are covered via handleDisconnectedExecutors which is scheduled at some rate and then it calls removeExecutor in CoarseGrainedSchedulerBackend which updates blacklist info. When we want to launch new executors, CoarseGrainedSchedulerBackend will terminate an executor that is already started on a blacklisted node. IMHO kubernetes spark scheduler should filter nodes and constraint where pods are launched on, as it knows already that some nodes are no option. For example this could be done with this feature, also relates to (https://github.com/kubernetes/kubernetes/issues/14573), in cases like where node problems appear. 

             

            skonto Stavros Kontopoulos added a comment - - edited When an executor fails all cases are covered via handleDisconnectedExecutors which is scheduled at some rate and then it calls removeExecutor in CoarseGrainedSchedulerBackend which updates blacklist info. When we want to launch new executors, CoarseGrainedSchedulerBackend  will terminate an executor that is already started on a blacklisted node. IMHO kubernetes spark scheduler should filter nodes and constraint where pods are launched on, as it knows already that some nodes are no option. For example this could be done with this feature , also relates to ( https://github.com/kubernetes/kubernetes/issues/14573),  in cases like where node problems appear.   
            liyinan926 Yinan Li added a comment -

            The Kubernetes scheduler backend simply creates executor pods through the Kubernetes API server, and the pods are scheduled by the Kubernetes scheduler to run on the available nodes. The scheduler backend is not interested nor it should know about the mapping from pods to nodes. Affinity and anti-affinity, or taint and toleration can be used to influence pod scheduling. But it's the Kubernetes scheduler and Kubelets' responsibilities to keep track of node problems and avoid scheduling pods onto problematic nodes.

            liyinan926 Yinan Li added a comment - The Kubernetes scheduler backend simply creates executor pods through the Kubernetes API server, and the pods are scheduled by the Kubernetes scheduler to run on the available nodes. The scheduler backend is not interested nor it should know about the mapping from pods to nodes. Affinity and anti-affinity, or taint and toleration can be used to influence pod scheduling. But it's the Kubernetes scheduler and Kubelets' responsibilities to keep track of node problems and avoid scheduling pods onto problematic nodes.
            skonto Stavros Kontopoulos added a comment - - edited

            Does the scheduler know all the reasons an app might want to define a node as problematic and (blacklisted in our case)? Is there a pointer for the node problems ?

            It seems to me that this is the reason taints are introduced (might be wrong). 

            Could kubelet detect the scenario for which blacklist was introduced anyway:

            https://blog.cloudera.com/blog/2017/04/blacklisting-in-apache-spark/? 

            skonto Stavros Kontopoulos added a comment - - edited Does the scheduler know all the reasons an app might want to define a node as problematic and (blacklisted in our case)? Is there a pointer for the node problems ? It seems to me that this is the reason taints are introduced (might be wrong).  Could kubelet detect the scenario for which blacklist was introduced anyway: https://blog.cloudera.com/blog/2017/04/blacklisting-in-apache-spark/?  
            irashid Imran Rashid added a comment -

            Yeah I don't think its safe to assume that its kubernetes responsibility to entirely figure out the equivalent of a spark application's internal blacklist. You can't guarantee that it'll detect hardware issues, and it also might be an issue which is specific to the spark application (eg. a missing jar).
            Yarn has some basic detection of bad nodes as well, but we observed cases in production where one bad disk would effectively take out an entire application on a large cluster without spark's blacklisting, as you could have many task failures pile up very quickly.

            That said, the existing blacklist implementation in spark already handles that case, even without the extra handling I'm proposing here. The spark app would still have its own node blacklist, and would avoid scheduling tasks on that node.

            However, this is suboptimal because spark isn't really getting as many resources as it should. Eg., it would request 10 executors, kubernetes hands it 10, but really spark can only use 8 of them because 2 live on a node that is blacklisted.

            I don't think this can be directly handled with taints, if I understand correctly. I assume applying a taint is an admin level thing? that would mean a spark app couldn't dynamically apply a taint when it discovers a problem on a node (and really, it probably shouldn't be able to, as it shouldn't trust an arbitrary user). Furthermore, it doesn't allow it to be application specific – blacklisting is really just a heuristic, and you probably do not want it to be applied across applications. Its not clear what you'd do with multiple apps each with their own blacklist, as nodes go into the blacklist and then move out of the blacklist at different times from each app.

            irashid Imran Rashid added a comment - Yeah I don't think its safe to assume that its kubernetes responsibility to entirely figure out the equivalent of a spark application's internal blacklist. You can't guarantee that it'll detect hardware issues, and it also might be an issue which is specific to the spark application (eg. a missing jar). Yarn has some basic detection of bad nodes as well, but we observed cases in production where one bad disk would effectively take out an entire application on a large cluster without spark's blacklisting, as you could have many task failures pile up very quickly. That said, the existing blacklist implementation in spark already handles that case, even without the extra handling I'm proposing here. The spark app would still have its own node blacklist, and would avoid scheduling tasks on that node. However, this is suboptimal because spark isn't really getting as many resources as it should. Eg., it would request 10 executors, kubernetes hands it 10, but really spark can only use 8 of them because 2 live on a node that is blacklisted. I don't think this can be directly handled with taints, if I understand correctly. I assume applying a taint is an admin level thing? that would mean a spark app couldn't dynamically apply a taint when it discovers a problem on a node (and really, it probably shouldn't be able to, as it shouldn't trust an arbitrary user). Furthermore, it doesn't allow it to be application specific – blacklisting is really just a heuristic, and you probably do not want it to be applied across applications. Its not clear what you'd do with multiple apps each with their own blacklist, as nodes go into the blacklist and then move out of the blacklist at different times from each app.
            liyinan926 Yinan Li added a comment -

            In the Yarn case, yes, it's possible that a node is missing a jar commonly needed by applications. In the Kubernetes mode, this will never be the case because containers either all have a particular jar locally or none of them has it. An image missing a dependency is problematic by itself. This consistency is one of the benefit of being containerized. Talking about node problems, detecting node problems and avoid scheduling pods onto problematic nodes are the concerns of the kubelets and the scheduler. Applications should not need to worry about if nodes are healthy or not. Node problems happening at runtime cause pods to be evicted from the problematic nodes and rescheduled somewhere else. Having applications be responsible for keeping track of problematic nodes and maintain a blacklist means unnecessarily jumping into the business of kubelets and the scheduler.

             

            foxish

            liyinan926 Yinan Li added a comment - In the Yarn case, yes, it's possible that a node is missing a jar commonly needed by applications. In the Kubernetes mode, this will never be the case because containers either all have a particular jar locally or none of them has it. An image missing a dependency is problematic by itself. This consistency is one of the benefit of being containerized. Talking about node problems, detecting node problems and avoid scheduling pods onto problematic nodes are the concerns of the kubelets and the scheduler. Applications should not need to worry about if nodes are healthy or not. Node problems happening at runtime cause pods to be evicted from the problematic nodes and rescheduled somewhere else. Having applications be responsible for keeping track of problematic nodes and maintain a blacklist means unnecessarily jumping into the business of kubelets and the scheduler.   foxish
            foxish Anirudh Ramanathan added a comment - - edited

            While mostly I think that K8s would be better suited to make the decision to blacklist nodes, I think we will see that there are causes to consider nodes problematic beyond just the kubelet health checks, so, using Spark's blacklisting sounds like a good idea to me.

            Tainting nodes isn't the right solution given it's one Spark application's notion of a blacklist and we don't want it to be applied at a cluster level. We could however, use node anti-affinity to communicate said blacklist and ensure that certain nodes are avoided by executors of that application.

            foxish Anirudh Ramanathan added a comment - - edited While mostly I think that K8s would be better suited to make the decision to blacklist nodes, I think we will see that there are causes to consider nodes problematic beyond just the kubelet health checks, so, using Spark's blacklisting sounds like a good idea to me. Tainting nodes isn't the right solution given it's one Spark application's notion of a blacklist and we don't want it to be applied at a cluster level. We could however, use node anti-affinity to communicate said blacklist and ensure that certain nodes are avoided by executors of that application.
            irashid Imran Rashid added a comment -

            ok the missing jar was a bad example on kubernetes ... I still wouldn't be surprised if there is some app-specific failure mode we're failing to take into account.

            I think you are too confident in kubernetes ability to detect problems with nodes – I don't know what it does but I don't think it is possible for it handle this. It would be great if we really could rely on the separation of concerns you want; in practice that just doesn't work because the app has more info.
            It almost sounds like you think Spark should not even use any internal blacklisting with kubernetes – from experience with large non-kubernetes deployments, I think that is a bad idea.

            irashid Imran Rashid added a comment - ok the missing jar was a bad example on kubernetes ... I still wouldn't be surprised if there is some app-specific failure mode we're failing to take into account. I think you are too confident in kubernetes ability to detect problems with nodes – I don't know what it does but I don't think it is possible for it handle this. It would be great if we really could rely on the separation of concerns you want; in practice that just doesn't work because the app has more info. It almost sounds like you think Spark should not even use any internal blacklisting with kubernetes – from experience with large non-kubernetes deployments, I think that is a bad idea.
            liyinan926 Yinan Li added a comment - - edited

            It's not that I'm too confident on the capability of Kubernetes to detect node problems. I just don't see it as a good practice of worrying about node problems at application level in a containerized environment running on a container orchestration system. For that reason, yes, I don't think Spark on Kubernetes should really need to worry about blacklisting nodes.

            liyinan926 Yinan Li added a comment - - edited It's not that I'm too confident on the capability of Kubernetes to detect node problems. I just don't see it as a good practice of worrying about node problems at application level in a containerized environment running on a container orchestration system. For that reason, yes, I don't think Spark on Kubernetes should really need to worry about blacklisting nodes.
            skonto Stavros Kontopoulos added a comment - - edited

            How about locality preferences + a hardware problem, like the disk problem? I see code in Spark Kubernetes scheduler related to locality (not sure if it is completed). Will that problem be detected and will kubernetes scheduler consider the node as problematic? If so then I guess there is no need for blacklisting in such scenarios. If though, this cannot be detected and the task is failing but there is locality preference what will happen? Kubernetes should not just re-run things elsewhere just because there was a failure. The reason for a failure matters. Is that an app failure or something lower level. (I am new to kubernetes, so I missed the fact that taints is applied cluster wide, we just need some similar feature as already mentioned node anti-affinity).

             

            skonto Stavros Kontopoulos added a comment - - edited How about locality preferences + a hardware problem, like the disk problem? I see code in Spark Kubernetes scheduler related to locality (not sure if it is completed). Will that problem be detected and will kubernetes scheduler consider the node as problematic? If so then I guess there is no need for blacklisting in such scenarios. If though, this cannot be detected and the task is failing but there is locality preference what will happen? Kubernetes should not just re-run things elsewhere just because there was a failure. The reason for a failure matters. Is that an app failure or something lower level. (I am new to kubernetes, so I missed the fact that taints is applied cluster wide, we just need some similar feature as already mentioned node anti-affinity).  
            foxish Anirudh Ramanathan added a comment - - edited

            Stavros - we do currently differentiate between kubernetes causing an executor to disappear (node failure) and exit caused by the application itself.

            Here's some detail on node issues and k8s:

            The node level problem detection is split between the Kubelet and the Node Problem Detector. This works for some common errors and in future, will taint nodes upon detecting them. Some of these errors are listed here. However, there are some categories of errors this setup won't detect. For example: if we have a node that has firewall rules/networking that prevents an executor running on it accessing a particular external service, to say - download/stream data. Or, a node with issues in its local disk which makes a spark executor on it throw read/write errors. These error conditions may only affect certain kinds of pods on that node and not others.

            Yinan's point I think is that it is uncommon for applications on k8s to try and incorporate reasoning about node level conditions. I think this is because the general expectation is that a failure on a given node will just cause new executors to spin up on different nodes and eventually the application will succeed. However, I can see this being an issue in large-scale production deployments, where we'd see transient errors like above. Given the existence of a blacklist mechanism and anti-affinity primitives, it wouldn't be too complex to incorporate it I think.

            aash mcheah, have you guys seen this in practice thus far?

            foxish Anirudh Ramanathan added a comment - - edited Stavros - we do currently differentiate between kubernetes causing an executor to disappear (node failure) and exit caused by the application itself. Here's some detail on node issues and k8s: The node level problem detection is split between the Kubelet and the Node Problem Detector . This works for some common errors and in future, will taint nodes upon detecting them. Some of these errors are listed here . However, there are some categories of errors this setup won't detect. For example: if we have a node that has firewall rules/networking that prevents an executor running on it accessing a particular external service, to say - download/stream data. Or, a node with issues in its local disk which makes a spark executor on it throw read/write errors. These error conditions may only affect certain kinds of pods on that node and not others. Yinan's point I think is that it is uncommon for applications on k8s to try and incorporate reasoning about node level conditions. I think this is because the general expectation is that a failure on a given node will just cause new executors to spin up on different nodes and eventually the application will succeed. However, I can see this being an issue in large-scale production deployments, where we'd see transient errors like above. Given the existence of a blacklist mechanism and anti-affinity primitives, it wouldn't be too complex to incorporate it I think. aash mcheah , have you guys seen this in practice thus far?
            irashid Imran Rashid added a comment -

            I think this is because the general expectation is that a failure on a given node will just cause new executors to spin up on different nodes and eventually the application will succeed.

            I think this is the part which may be particularly different in spark. Some types of failures do not cause the executor to die – its just a task failure, and the executor itself is still alive. As long as Spark gets heartbeats from the executor, it figures its still fine. But a bad disk can cause tasks to repeatedly fail. Could be true for other resources, eg. a bad gpu, and maybe the gpu is only used by certain tasks.

            When that happens, without spark's internal blacklisting, an application will very quickly hit many task failures. The task fails, spark notices that, tries to find a place to assign the failed task, puts it back in the same place; repeat till spark decides there are too many failures and gives up. It can easily cause your app to fail in ~1 second. There is no communication with the cluster manager through this process, its all just between the spark's driver & executor. In one case, when this happened yarn's own health checker discovered the problem a few mins after it occurred – but the spark app had already failed by that point. From one bad disk in a cluster w/ > 1000 disks.

            Spark's blacklisting is really meant to be complementary to the type of node health checks you are talking about in kubernetes. The blacklisting in spark intentionally does not try to figure out the root cause of the problem, as we don't want to get into the game of enumerating all of the possibilities. Its a heuristic which makes it safe for spark to keep going in case of these un-caught errors, but then retries the resources when it would be safe to do so. (discussed in more detail in the design doc on SPARK-8425.)

            anti-affinity in kubernetes may be just the trick, though this part of the doc was a little worrisome:

            Note: Inter-pod affinity and anti-affinity require substantial amount of processing which can slow down scheduling in large clusters significantly. We do not recommend using them in clusters larger than several hundred nodes.

            Blacklisting is most important in large clusters. It seems like its able to do something much more complicated than a simple node blacklist, though – maybe it would already be faster with such a simple anti-affinity rule?

            irashid Imran Rashid added a comment - I think this is because the general expectation is that a failure on a given node will just cause new executors to spin up on different nodes and eventually the application will succeed. I think this is the part which may be particularly different in spark. Some types of failures do not cause the executor to die – its just a task failure, and the executor itself is still alive. As long as Spark gets heartbeats from the executor, it figures its still fine. But a bad disk can cause tasks to repeatedly fail. Could be true for other resources, eg. a bad gpu, and maybe the gpu is only used by certain tasks. When that happens, without spark's internal blacklisting, an application will very quickly hit many task failures. The task fails, spark notices that, tries to find a place to assign the failed task, puts it back in the same place; repeat till spark decides there are too many failures and gives up. It can easily cause your app to fail in ~1 second. There is no communication with the cluster manager through this process, its all just between the spark's driver & executor. In one case, when this happened yarn's own health checker discovered the problem a few mins after it occurred – but the spark app had already failed by that point. From one bad disk in a cluster w/ > 1000 disks. Spark's blacklisting is really meant to be complementary to the type of node health checks you are talking about in kubernetes. The blacklisting in spark intentionally does not try to figure out the root cause of the problem, as we don't want to get into the game of enumerating all of the possibilities. Its a heuristic which makes it safe for spark to keep going in case of these un-caught errors, but then retries the resources when it would be safe to do so. (discussed in more detail in the design doc on SPARK-8425 .) anti-affinity in kubernetes may be just the trick, though this part of the doc was a little worrisome: Note: Inter-pod affinity and anti-affinity require substantial amount of processing which can slow down scheduling in large clusters significantly. We do not recommend using them in clusters larger than several hundred nodes. Blacklisting is most important in large clusters. It seems like its able to do something much more complicated than a simple node blacklist, though – maybe it would already be faster with such a simple anti-affinity rule?
            irashid Imran Rashid added a comment -

            I think this issue is still valid for current versions, it was just opened against 2.3.0, so reopening.

            irashid Imran Rashid added a comment - I think this issue is still valid for current versions, it was just opened against 2.3.0, so reopening.
            wanglijie Lijie Wang added a comment -

            Hi, what's the current status of this issue? Does the kubernetes mode support node blacklists in lastest version (3.x)?

            wanglijie Lijie Wang added a comment - Hi, what's the current status of this issue? Does the kubernetes mode support node blacklists in lastest version (3.x)?

            People

              Unassigned Unassigned
              irashid Imran Rashid
              Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated: