Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-650

Add a "setup hook" API for running initialization code on each executor

    Details

    • Type: New Feature
    • Status: Reopened
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: Spark Core
    • Labels:
      None

      Description

      Would be useful to configure things like reporting libraries

        Issue Links

          Activity

          Hide
          aash Andrew Ash added a comment -

          As mentioned in SPARK-572 static classes' initialization methods are being "abused" to perform this functionality.

          Matei Zaharia do you still feel that a per-executor initialization function is a hook that Spark should expose in its public API?

          Show
          aash Andrew Ash added a comment - As mentioned in SPARK-572 static classes' initialization methods are being "abused" to perform this functionality. Matei Zaharia do you still feel that a per-executor initialization function is a hook that Spark should expose in its public API?
          Hide
          lars_francke Lars Francke added a comment -

          Not Matei Zaharia but I think this would be a good idea to have. Abusing another undocumented concept doesn't seem like a nice way to treat a useful and common use-case.

          Show
          lars_francke Lars Francke added a comment - Not Matei Zaharia but I think this would be a good idea to have. Abusing another undocumented concept doesn't seem like a nice way to treat a useful and common use-case.
          Hide
          Skamandros Michael Schmeißer added a comment -

          I would need this feature as well to perform some initialization of the logging system (which reads its configuration from an external source rather than just a file).

          Show
          Skamandros Michael Schmeißer added a comment - I would need this feature as well to perform some initialization of the logging system (which reads its configuration from an external source rather than just a file).
          Hide
          luisramos Luis Ramos added a comment -

          I have similar requirements to Michael's – this would be a very useful feature to have.

          Show
          luisramos Luis Ramos added a comment - I have similar requirements to Michael's – this would be a very useful feature to have.
          Hide
          holdenk holdenk added a comment -

          I think this is a duplicate of SPARK-636 yes?

          Show
          holdenk holdenk added a comment - I think this is a duplicate of SPARK-636 yes?
          Hide
          Skamandros Michael Schmeißer added a comment - - edited

          To me, the two seem related, but not exact duplicates. SPARK-636 seems to aim for a more generic mechanism.

          Show
          Skamandros Michael Schmeißer added a comment - - edited To me, the two seem related, but not exact duplicates. SPARK-636 seems to aim for a more generic mechanism.
          Hide
          holdenk holdenk added a comment -

          Would people feel ok if we marked this as a duplicate of 636 since it does seem like this a subset of 636.

          Show
          holdenk holdenk added a comment - Would people feel ok if we marked this as a duplicate of 636 since it does seem like this a subset of 636.
          Hide
          Skamandros Michael Schmeißer added a comment -

          I disagree that those issues are duplicates. Spark-636 looks for a generic way to execute code on the Executors, but not for a reliable and easy mechanism to execute code during Executor initialization.

          Show
          Skamandros Michael Schmeißer added a comment - I disagree that those issues are duplicates. Spark-636 looks for a generic way to execute code on the Executors, but not for a reliable and easy mechanism to execute code during Executor initialization.
          Hide
          srowen Sean Owen added a comment -

          In practice, these should probably all be WontFix as it hasn't mattered enough to implement in almost 4 years. It really doesn't matter.

          Show
          srowen Sean Owen added a comment - In practice, these should probably all be WontFix as it hasn't mattered enough to implement in almost 4 years. It really doesn't matter.
          Hide
          Skamandros Michael Schmeißer added a comment -

          Then somebody should please explain to me, how this doesn't matter or rather how certain use-cases are supposed to be solved. We need to initialize each JVM and connect it to our logging system, set correlation IDs, initialize contexts and so on. I guess that most users just have implemented work-arounds as we did, but in an enterprise environment, this is really not the preferable long-term solution to me. Plus, I think that it would really not be hard to implement this feature for someone who has knowledge about the Spark executor setup.

          Show
          Skamandros Michael Schmeißer added a comment - Then somebody should please explain to me, how this doesn't matter or rather how certain use-cases are supposed to be solved. We need to initialize each JVM and connect it to our logging system, set correlation IDs, initialize contexts and so on. I guess that most users just have implemented work-arounds as we did, but in an enterprise environment, this is really not the preferable long-term solution to me. Plus, I think that it would really not be hard to implement this feature for someone who has knowledge about the Spark executor setup.
          Hide
          srowen Sean Owen added a comment -

          Sorry, I mean the status doesn't matter. Most issues this old are obsolete or de facto won't-fix. Resolving it or not doesn't matter.

          I would even say this is 'not a problem', because a simple singleton provides once-per-executor execution of whatever you like. It's more complex to make a custom mechanism that makes you route this via Spark. That's probably way this hasn't proved necessary.

          Show
          srowen Sean Owen added a comment - Sorry, I mean the status doesn't matter. Most issues this old are obsolete or de facto won't-fix. Resolving it or not doesn't matter. I would even say this is 'not a problem', because a simple singleton provides once-per-executor execution of whatever you like. It's more complex to make a custom mechanism that makes you route this via Spark. That's probably way this hasn't proved necessary.
          Hide
          oarmand Olivier Armand added a comment -

          Sean, a singleton is not the best option in our case. The Spark Streaming executors are writing to HBase, we need to initialize the HBase connection. The singleton seems (or seemed when we tested it for our customer a few months after this issue was raised) to be created when the first RDD is processed by the executor, and not when the driver starts. This imposes very high processing time for the first events.

          Show
          oarmand Olivier Armand added a comment - Sean, a singleton is not the best option in our case. The Spark Streaming executors are writing to HBase, we need to initialize the HBase connection. The singleton seems (or seemed when we tested it for our customer a few months after this issue was raised) to be created when the first RDD is processed by the executor, and not when the driver starts. This imposes very high processing time for the first events.
          Hide
          srowen Sean Owen added a comment -

          If you need init to happen ASAP when the driver starts, isn't any similar mechanism going to be about the same in this regard? This cost is paid just once, and I don't think in general startup is very low latency for any Spark app.

          Show
          srowen Sean Owen added a comment - If you need init to happen ASAP when the driver starts, isn't any similar mechanism going to be about the same in this regard? This cost is paid just once, and I don't think in general startup is very low latency for any Spark app.
          Hide
          lars_francke Lars Francke added a comment -

          I also have to disagree with this being a duplicate or obsolete.

          Olivier Armand and Michael Schmeißer already mentioned reasons regarding the duplication.

          About it being obsolete: I have seen multiple clients facing this problem, finding this issue and hoping it'd get fixed some day. I would hesitate a guess and say that most users of Spark have no JIRA account here and do not register or log in just to vote for this issue. That said: This issue is (with six votes) in the top 150 out of almost 17k total issues in the Spark project.

          As it happens this is a non-trivial thing to implement in Spark (as far as I can tell from my limited knowledge of the inner workings) so it's pretty hard for a "drive by" contributor to help here.

          You had the discussion about community perception on the mailing list (re: Spark Improvement Proposals) and this issue happens to be one of those that at least I see popping up every once in a while in discussions with clients.

          I would love to see this issue staying open as a feature request and have some hope that someone someday will implement it.

          Show
          lars_francke Lars Francke added a comment - I also have to disagree with this being a duplicate or obsolete. Olivier Armand and Michael Schmeißer already mentioned reasons regarding the duplication. About it being obsolete: I have seen multiple clients facing this problem, finding this issue and hoping it'd get fixed some day. I would hesitate a guess and say that most users of Spark have no JIRA account here and do not register or log in just to vote for this issue. That said: This issue is (with six votes) in the top 150 out of almost 17k total issues in the Spark project. As it happens this is a non-trivial thing to implement in Spark (as far as I can tell from my limited knowledge of the inner workings) so it's pretty hard for a "drive by" contributor to help here. You had the discussion about community perception on the mailing list (re: Spark Improvement Proposals) and this issue happens to be one of those that at least I see popping up every once in a while in discussions with clients. I would love to see this issue staying open as a feature request and have some hope that someone someday will implement it.
          Hide
          srowen Sean Owen added a comment -

          As you wish, but, I disagree with this type of reasoning about JIRAs. I dont think anyone has addressed why a singleton isn't the answer. I can think of corner cases, but, that's why I suspect it isn't something that has needed implementing.

          Show
          srowen Sean Owen added a comment - As you wish, but, I disagree with this type of reasoning about JIRAs. I dont think anyone has addressed why a singleton isn't the answer. I can think of corner cases, but, that's why I suspect it isn't something that has needed implementing.
          Hide
          lars_francke Lars Francke added a comment - - edited

          I can only come up with three reasons at the moment. I hope they all make sense.

          1) Singletons/Static Initialisers run once per Class Loader where this class is being loaded/used. I haven't actually seen this being a problem (and it might actually be desired behaviour in this case) but making the init step explicit would prevent this from ever becoming one.
          2) I'd like to fail fast for some things and not upon first access (which might be behind a conditional somewhere)
          3) It is hard enough to reason about where some piece of code is running as it is (Driver or Task/Executor), adding Singletons to the mix makes it even more confusing.

          Thank you for reopening!

          Show
          lars_francke Lars Francke added a comment - - edited I can only come up with three reasons at the moment. I hope they all make sense. 1) Singletons/Static Initialisers run once per Class Loader where this class is being loaded/used. I haven't actually seen this being a problem (and it might actually be desired behaviour in this case) but making the init step explicit would prevent this from ever becoming one. 2) I'd like to fail fast for some things and not upon first access (which might be behind a conditional somewhere) 3) It is hard enough to reason about where some piece of code is running as it is (Driver or Task/Executor), adding Singletons to the mix makes it even more confusing. Thank you for reopening!
          Hide
          srowen Sean Owen added a comment -

          Reopening doesn't do anything by itself, or cause anyone to consider this. If this just sits for another year, it will have been a tiny part of a larger problem. I would ask those asking to keep this open to advance the discussion, or else I think you'd agree it eventually should be closed. (Here, I'm really speaking about hundreds of issues like this here, not so much this one.)

          Part of the problem is that I don't think the details of this feature request were ever elaborated. I think that if you dig into what it would mean, you'd find that a) it's kind of tricky to define and then implement all the right semantics, and b) almost any use case along these lines in my experience is resolved as I suggest, with a simple per-JVM initialization. If the response lately here is, well, we're not quite sure how that works, then we need to get to the bottom of that, not just insisting an issue stay open.

          To your points:

          • The executor is going to load user code into one classloader, so we do have that an executor = JVM = classloader.
          • You can fail things as fast as you like by invoking this init as soon as like in your app.
          • It's clear where things execute, or else, we must assume app developers understand this or else all bets are off. The driver program executes things in the driver unless they're part of a distributed map() etc operation, which clearly execute on the executor.

          These IMHO aren't reasons to design a new, different, bespoke mechanism. That has a cost too, if you're positing that it's hard to understand when things run where.

          The one catch I see is that, by design, we don't control which tasks run on what executors. We can't guarantee init code runs on all executors this way. But, is it meaningful to initialize an executor that never sees an app's tasks? it can't be. Lazy init is a good thing and compatible with the Spark model. If startup time is an issue (and I'm still not clear on the latency problem mentioned above), then it gets a little more complicated, but, that's also a little more niche: just run a dummy mapPartitions at the outset on the same data that the first job would touch, even asynchronously if you like with other driver activities. No need to wait; it just gives the init a head-start on the executors that will need it straight away.

          That's just my opinion of course, but I think those are the questions that would need to be answered to argue something happens here.

          Show
          srowen Sean Owen added a comment - Reopening doesn't do anything by itself, or cause anyone to consider this. If this just sits for another year, it will have been a tiny part of a larger problem. I would ask those asking to keep this open to advance the discussion, or else I think you'd agree it eventually should be closed. (Here, I'm really speaking about hundreds of issues like this here, not so much this one.) Part of the problem is that I don't think the details of this feature request were ever elaborated. I think that if you dig into what it would mean, you'd find that a) it's kind of tricky to define and then implement all the right semantics, and b) almost any use case along these lines in my experience is resolved as I suggest, with a simple per-JVM initialization. If the response lately here is, well, we're not quite sure how that works, then we need to get to the bottom of that, not just insisting an issue stay open. To your points: The executor is going to load user code into one classloader, so we do have that an executor = JVM = classloader. You can fail things as fast as you like by invoking this init as soon as like in your app. It's clear where things execute, or else, we must assume app developers understand this or else all bets are off. The driver program executes things in the driver unless they're part of a distributed map() etc operation, which clearly execute on the executor. These IMHO aren't reasons to design a new, different, bespoke mechanism. That has a cost too, if you're positing that it's hard to understand when things run where. The one catch I see is that, by design, we don't control which tasks run on what executors. We can't guarantee init code runs on all executors this way. But, is it meaningful to initialize an executor that never sees an app's tasks? it can't be. Lazy init is a good thing and compatible with the Spark model. If startup time is an issue (and I'm still not clear on the latency problem mentioned above), then it gets a little more complicated, but, that's also a little more niche: just run a dummy mapPartitions at the outset on the same data that the first job would touch, even asynchronously if you like with other driver activities. No need to wait; it just gives the init a head-start on the executors that will need it straight away. That's just my opinion of course, but I think those are the questions that would need to be answered to argue something happens here.
          Hide
          oarmand Olivier Armand added a comment -

          > "just run a dummy mapPartitions at the outset on the same data that the first job would touch"

          But this wouldn't work for Spark Streaming? (our case).

          Show
          oarmand Olivier Armand added a comment - > "just run a dummy mapPartitions at the outset on the same data that the first job would touch" But this wouldn't work for Spark Streaming? (our case).
          Hide
          srowen Sean Owen added a comment -

          It would work in this case to immediately schedule initialization on the executors because it sounds like data arrives immediately in your case. The part I am missing is how it can occur faster than this with another mechanism.

          Show
          srowen Sean Owen added a comment - It would work in this case to immediately schedule initialization on the executors because it sounds like data arrives immediately in your case. The part I am missing is how it can occur faster than this with another mechanism.
          Hide
          oarmand Olivier Armand added a comment -

          Data doesn't arrives necessarily immediately, but we need to ensure that when it arrives, lazy initialization doesn't introduce latency.

          Show
          oarmand Olivier Armand added a comment - Data doesn't arrives necessarily immediately, but we need to ensure that when it arrives, lazy initialization doesn't introduce latency.
          Hide
          srowen Sean Owen added a comment -

          Yeah that's a decent use case, because latency is an issue (streaming) and you potentially have time to set up before latency matters.

          You can still use this approach because empty RDDs arrive if no data has, and empty RDDs can still be repartitioned. Here's a way to, once, if the first RDD has no data, do something once per partition, which ought to amount to at least once per executor:

          var first = true
          lines.foreachRDD { rdd =>
            if (first) {
              if (rdd.isEmpty) {
                rdd.repartition(sc.defaultParallelism).foreachPartition(_ => Thing.initOnce())
              }
              first = false
            }
          }
          

          "ought", because, there isn't actually a guarantee that it will put the empty partitions on different executors. In practice, it seems to, when I just tried it.

          That's a partial solution, but it's an optimization anyway, and maybe it helps you right now. I am still not sure it means this needs a whole mechanism, if this is the only type of use case. Maybe there are others.

          Show
          srowen Sean Owen added a comment - Yeah that's a decent use case, because latency is an issue (streaming) and you potentially have time to set up before latency matters. You can still use this approach because empty RDDs arrive if no data has, and empty RDDs can still be repartitioned. Here's a way to, once, if the first RDD has no data, do something once per partition, which ought to amount to at least once per executor: var first = true lines.foreachRDD { rdd => if (first) { if (rdd.isEmpty) { rdd.repartition(sc.defaultParallelism).foreachPartition(_ => Thing.initOnce()) } first = false } } "ought", because, there isn't actually a guarantee that it will put the empty partitions on different executors. In practice, it seems to, when I just tried it. That's a partial solution, but it's an optimization anyway, and maybe it helps you right now. I am still not sure it means this needs a whole mechanism, if this is the only type of use case. Maybe there are others.
          Hide
          Skamandros Michael Schmeißer added a comment -

          Ok, let me explain the specific problems that we have encountered, which might help to understand the issue and possible solutions:

          We need to run some code on the executors before anything gets processed, e.g. initialization of the log system or context setup. To do this, we need information that is present on the driver, but not on the executors. Our current solution is to provide a base class for Spark function implementations which contains the information from the driver and initializes everything in its readObject method. Since multiple narrow-dependent functions may be executed on the same executor JVM subsequently, this class needs to make sure that initialization doesn't run multiple times. Sure, that's not hard to do, but if you mix setup and cleanup logic for functions, partitions and/or the JVM itself, it can get quite confusing without explicit hooks.

          So, our solution basically works, but with that approach, you can't use lambdas for Spark functions, which is quite inconvenient, especially for simple map operations. Even worse, if you use a lambda or otherwise forget to extend the required base class, the initialization doesn't occur and very weird exceptions follow, depending on which resource your function tries to access during its execution. Or if you have very bad luck, no exception will occur, but the log messages will get logged to an incorrect destination. It's very hard to prevent such cases without an explicit initialization mechanism and in a team with several developers, you can't expect everyone to know what is going on there.

          Show
          Skamandros Michael Schmeißer added a comment - Ok, let me explain the specific problems that we have encountered, which might help to understand the issue and possible solutions: We need to run some code on the executors before anything gets processed, e.g. initialization of the log system or context setup. To do this, we need information that is present on the driver, but not on the executors. Our current solution is to provide a base class for Spark function implementations which contains the information from the driver and initializes everything in its readObject method. Since multiple narrow-dependent functions may be executed on the same executor JVM subsequently, this class needs to make sure that initialization doesn't run multiple times. Sure, that's not hard to do, but if you mix setup and cleanup logic for functions, partitions and/or the JVM itself, it can get quite confusing without explicit hooks. So, our solution basically works, but with that approach, you can't use lambdas for Spark functions, which is quite inconvenient, especially for simple map operations. Even worse, if you use a lambda or otherwise forget to extend the required base class, the initialization doesn't occur and very weird exceptions follow, depending on which resource your function tries to access during its execution. Or if you have very bad luck, no exception will occur, but the log messages will get logged to an incorrect destination. It's very hard to prevent such cases without an explicit initialization mechanism and in a team with several developers, you can't expect everyone to know what is going on there.
          Hide
          srowen Sean Owen added a comment -

          This is still easy to do with mapPartitions, which can call initWithTheseParamsIfNotAlreadyInitialized(...) once per partition, which should guarantee it happens once per JVM before anything else proceeds. I don't think you need to bury it in serialization logic. I can see there are hard ways to implement this, but I believe an easy way is still readily available within the existing API mechanisms.

          Show
          srowen Sean Owen added a comment - This is still easy to do with mapPartitions, which can call initWithTheseParamsIfNotAlreadyInitialized(...) once per partition, which should guarantee it happens once per JVM before anything else proceeds. I don't think you need to bury it in serialization logic. I can see there are hard ways to implement this, but I believe an easy way is still readily available within the existing API mechanisms.
          Hide
          Skamandros Michael Schmeißer added a comment -

          But I'll need to have an RDD to do this, I can't just do it during the SparkContext setup - right now, we have multiple sources of RDDs and every developer would still need to know that they have to run this code after creating an RDD, won't they? Or is there some way to use a "pseudo-RDD" right after creation of the SparkContext to execute the init code on the executors?

          Show
          Skamandros Michael Schmeißer added a comment - But I'll need to have an RDD to do this, I can't just do it during the SparkContext setup - right now, we have multiple sources of RDDs and every developer would still need to know that they have to run this code after creating an RDD, won't they? Or is there some way to use a "pseudo-RDD" right after creation of the SparkContext to execute the init code on the executors?
          Hide
          srowen Sean Owen added a comment -

          But, why do you need to do it before you have an RDD? You can easily make this a library function. Or, just some static init that happens on demand whenever a certain class is loaded. The nice thing about that is that it's transparent, just like with any singleton / static init in the JVM.

          If you really want, you can make an empty RDD and repartition it and use that as a dummy, but it only serves to do some initialization early that would happen transparently anyway.

          Show
          srowen Sean Owen added a comment - But, why do you need to do it before you have an RDD? You can easily make this a library function. Or, just some static init that happens on demand whenever a certain class is loaded. The nice thing about that is that it's transparent, just like with any singleton / static init in the JVM. If you really want, you can make an empty RDD and repartition it and use that as a dummy, but it only serves to do some initialization early that would happen transparently anyway.
          Hide
          Skamandros Michael Schmeißer added a comment -

          What if I have a Hadoop InputFormat? Then, certain things happen before the first RDD exists, don't they?

          I'll give the solution with the empty RDD a shot next week, this sounds a little bit better than what we have right now, but it still relies on certain internals of Spark which are most likely undocumented and might change in future? I've had the feeling that Spark basically has a functional approach with the RDDs and executing anything on an empty RDD could be optimized to just do nothing?

          Show
          Skamandros Michael Schmeißer added a comment - What if I have a Hadoop InputFormat? Then, certain things happen before the first RDD exists, don't they? I'll give the solution with the empty RDD a shot next week, this sounds a little bit better than what we have right now, but it still relies on certain internals of Spark which are most likely undocumented and might change in future? I've had the feeling that Spark basically has a functional approach with the RDDs and executing anything on an empty RDD could be optimized to just do nothing?
          Hide
          srowen Sean Owen added a comment -

          BTW I am not suggesting an "empty RDD" for your case. That was specific to the streaming scenario.

          For this, again, why not just access some initialization method during class init of some class that is referenced wherever you want, including a custom InputFormat? This can be made to happen once per JVM (class loader), from any code, at class init time before anything else can happen. It's just a standard Java mechanism.

          If you mean it requires some configuration not available at class-loading time you can still make such an init take place wherever, as soon as, such configuration is available. Even in an InputFormat.

          Although I can imagine corner cases where this becomes hard, I think it's over-thinking this to imagine a whole new lifecycle method to accomplish what basic JVM mechanisms allow.

          Show
          srowen Sean Owen added a comment - BTW I am not suggesting an "empty RDD" for your case. That was specific to the streaming scenario. For this, again, why not just access some initialization method during class init of some class that is referenced wherever you want, including a custom InputFormat? This can be made to happen once per JVM (class loader), from any code, at class init time before anything else can happen. It's just a standard Java mechanism. If you mean it requires some configuration not available at class-loading time you can still make such an init take place wherever, as soon as, such configuration is available. Even in an InputFormat. Although I can imagine corner cases where this becomes hard, I think it's over-thinking this to imagine a whole new lifecycle method to accomplish what basic JVM mechanisms allow.
          Hide
          Skamandros Michael Schmeißer added a comment -

          I agree that static initialization would solve the problem for cases where everything is known or can be loaded at class-loading time, e.g. from property files in the artifact itself.

          For situations like RecordReaders, it might also work, because they have an initialize method where they get contextual information that could have been enriched with the required values from the driver.

          However, we also have other cases, where information from the driver is needed. Imagine the following case: We have a temporary directory in HDFS which is determined by the Oozie workflow instance ID. The driver knows this information, because it is provided by Oozie via main method arguments. The executor needs this information as well, e.g. to load some data that is required to initialize a static context. Then, the question arises: How does the information get to the executor?

          Either with the function instance which would mean that the developer of the function needs to know that he has to call an initialization method in every function or at least in every first function on an RDD (which he probably doesn't know, because he received the RDD from a different part of the application). Or with an explicit mechanism which is executed before the developer functions run on any executor. Which would lead me again to the "empty RDD" workaround.

          Show
          Skamandros Michael Schmeißer added a comment - I agree that static initialization would solve the problem for cases where everything is known or can be loaded at class-loading time, e.g. from property files in the artifact itself. For situations like RecordReaders, it might also work, because they have an initialize method where they get contextual information that could have been enriched with the required values from the driver. However, we also have other cases, where information from the driver is needed. Imagine the following case: We have a temporary directory in HDFS which is determined by the Oozie workflow instance ID. The driver knows this information, because it is provided by Oozie via main method arguments. The executor needs this information as well, e.g. to load some data that is required to initialize a static context. Then, the question arises: How does the information get to the executor? Either with the function instance which would mean that the developer of the function needs to know that he has to call an initialization method in every function or at least in every first function on an RDD (which he probably doesn't know, because he received the RDD from a different part of the application). Or with an explicit mechanism which is executed before the developer functions run on any executor. Which would lead me again to the "empty RDD" workaround.
          Hide
          srowen Sean Owen added a comment -

          Yep, if you must pass some configuration, it generally can't happen magically at class-loading time. You can provide a "initIfNeeded(conf)" method that must be explicitly called in key places, but, that's simple and canonical Java practice.

          In your example, there's no need to do anything. Just use the info in the function the executor runs. It's passed in the closure. This is entirely normal Spark.

          Show
          srowen Sean Owen added a comment - Yep, if you must pass some configuration, it generally can't happen magically at class-loading time. You can provide a "initIfNeeded(conf)" method that must be explicitly called in key places, but, that's simple and canonical Java practice. In your example, there's no need to do anything. Just use the info in the function the executor runs. It's passed in the closure. This is entirely normal Spark.
          Hide
          robert.neumann Robert Neumann added a comment -

          I am supporting Olivier Armand. We need a way in our Streaming job to setup an HBase connection per executor (and not per partition). A Singleton is not something we are looking at for this purpose.

          Show
          robert.neumann Robert Neumann added a comment - I am supporting Olivier Armand. We need a way in our Streaming job to setup an HBase connection per executor (and not per partition). A Singleton is not something we are looking at for this purpose.
          Hide
          srowen Sean Owen added a comment -

          Why would a singleton not work? This is really the essential question in this thread.

          Show
          srowen Sean Owen added a comment - Why would a singleton not work? This is really the essential question in this thread.
          Hide
          robert.neumann Robert Neumann added a comment -

          Sean, I agree this is the essential question in this thread. If we get this sorted out, then we are good and can achieve consensus on what to do with this ticket.
          A singleton "works" indeed. However, from a software engineering point of view it is not nice. There exists a class of Spark Streaming jobs that requires "setup -> do -> cleanup" semantics. The framework (in this case Spark Streaming) should explicitly support these semantics through appropriate API hooks. A singleton instead would hide these semantics and you would need to implement some laxy code to check whether an HBase connection was already setup or not; the singelton would need to do this for every write operation to HBase.
          I do not think that application logic (the Singleton within the Spark Streaming job) is the right place to wire in the "setup -> do -> cleanup" pattern. It is a generic pattern and there exists a class of Spark Streaming jobs (not only one specific Streaming job) that are based on this pattern.

          Show
          robert.neumann Robert Neumann added a comment - Sean, I agree this is the essential question in this thread. If we get this sorted out, then we are good and can achieve consensus on what to do with this ticket. A singleton "works" indeed. However, from a software engineering point of view it is not nice. There exists a class of Spark Streaming jobs that requires "setup -> do -> cleanup" semantics. The framework (in this case Spark Streaming) should explicitly support these semantics through appropriate API hooks. A singleton instead would hide these semantics and you would need to implement some laxy code to check whether an HBase connection was already setup or not; the singelton would need to do this for every write operation to HBase. I do not think that application logic (the Singleton within the Spark Streaming job) is the right place to wire in the "setup -> do -> cleanup" pattern. It is a generic pattern and there exists a class of Spark Streaming jobs (not only one specific Streaming job) that are based on this pattern.
          Hide
          hvanhovell Herman van Hovell added a comment -

          Lars FranckeMichael SchmeißerRobert Neumann If you think that this is an important feature, then write a design doc and open a PR.

          Show
          hvanhovell Herman van Hovell added a comment - Lars Francke Michael Schmeißer Robert Neumann If you think that this is an important feature, then write a design doc and open a PR.
          Hide
          robert.neumann Robert Neumann added a comment -

          OK. Will do.

          Show
          robert.neumann Robert Neumann added a comment - OK. Will do.
          Hide
          Skamandros Michael Schmeißer added a comment -

          A singleton is not really feasible if additional information is required which is known (or determined) by the driver and thus needs to be sent to the executors for the initialization to happen. In this case, the options are 1) use some side-channel that is "magically" inferred by the executor, 2) use an empty RDD, repartition it to the number of executors and run mapPartitions on it, 3) piggy-back the JavaSerializer to run the initialization before any function is called or 4) require every function which may need the resource to initialize it on its own.

          Each of these options has significant drawbacks in my opinion. While 4 sounds good for most cases, it has some cons which I've described earlier (my comment from Oct 16) and make it unfeasible for our use-case. Option 1 might be possible, but the data flow wouldn't be all that obvious. Right now, we go with a mix of option 2 and 3 (try to determine the number of executors and if you can't, hijack the serializer), but really, this is hacked and might break in future releases of Spark.

          Show
          Skamandros Michael Schmeißer added a comment - A singleton is not really feasible if additional information is required which is known (or determined) by the driver and thus needs to be sent to the executors for the initialization to happen. In this case, the options are 1) use some side-channel that is "magically" inferred by the executor, 2) use an empty RDD, repartition it to the number of executors and run mapPartitions on it, 3) piggy-back the JavaSerializer to run the initialization before any function is called or 4) require every function which may need the resource to initialize it on its own. Each of these options has significant drawbacks in my opinion. While 4 sounds good for most cases, it has some cons which I've described earlier (my comment from Oct 16) and make it unfeasible for our use-case. Option 1 might be possible, but the data flow wouldn't be all that obvious. Right now, we go with a mix of option 2 and 3 (try to determine the number of executors and if you can't, hijack the serializer), but really, this is hacked and might break in future releases of Spark.
          Hide
          Skamandros Michael Schmeißer added a comment -

          Thanks Robert Neumann! I am ready to help, if I can.

          Show
          Skamandros Michael Schmeißer added a comment - Thanks Robert Neumann ! I am ready to help, if I can.
          Hide
          srowen Sean Owen added a comment -

          Why? info X can be included in the closure, and the executor can call "single.getInstance(X)" to pass this info. Init happens only once in any event.

          Show
          srowen Sean Owen added a comment - Why? info X can be included in the closure, and the executor can call "single.getInstance(X)" to pass this info. Init happens only once in any event.
          Hide
          Skamandros Michael Schmeißer added a comment -

          Sure it can be included in the closure and this was also our first solution to the problem. But if the application has many layers and you need the resource which requires info X to initialize often, it soon gets very inconvenient because you have to pass X around a lot and pollute your APIs.

          Thus, our next solution was to create a base function class which takes X in its constructor and makes sure that the resource is initialized on the executor side if it wasn't before. The drawback of this solution is that the function developer can forget to extend the function base class and then he may or may not be able to access the resource depending on whether a function has run before on the executor which performed the initialization. This is really error-prone (actually led to errors) and even if done correctly, prevents lambdas from beeing used for functions.

          As a result, we now use the "empty RDD" approach or piggy-back the Spark JavaSerializer. Both works fine and initializes the executor-side resource properly on all executors. So, from a function developer's point-of-view that's nice, but overall, the solution relies on Spark internals to work which is why I would rather have an explicit mechanism to perform such an initialization.

          Show
          Skamandros Michael Schmeißer added a comment - Sure it can be included in the closure and this was also our first solution to the problem. But if the application has many layers and you need the resource which requires info X to initialize often, it soon gets very inconvenient because you have to pass X around a lot and pollute your APIs. Thus, our next solution was to create a base function class which takes X in its constructor and makes sure that the resource is initialized on the executor side if it wasn't before. The drawback of this solution is that the function developer can forget to extend the function base class and then he may or may not be able to access the resource depending on whether a function has run before on the executor which performed the initialization. This is really error-prone (actually led to errors) and even if done correctly, prevents lambdas from beeing used for functions. As a result, we now use the "empty RDD" approach or piggy-back the Spark JavaSerializer. Both works fine and initializes the executor-side resource properly on all executors. So, from a function developer's point-of-view that's nice, but overall, the solution relies on Spark internals to work which is why I would rather have an explicit mechanism to perform such an initialization.
          Hide
          hvanhovell Herman van Hovell added a comment -

          If you only try to propagate information, then you can use SparkContext.localProperties and the TaskContext on the executor side. They provide the machinery to do this.

          Show
          hvanhovell Herman van Hovell added a comment - If you only try to propagate information, then you can use SparkContext.localProperties and the TaskContext on the executor side. They provide the machinery to do this.
          Hide
          hvanhovell Herman van Hovell added a comment -

          A creatively applied broadcast variable might also do the trick BTW.

          Show
          hvanhovell Herman van Hovell added a comment - A creatively applied broadcast variable might also do the trick BTW.
          Hide
          Skamandros Michael Schmeißer added a comment -

          No, it's not just about propagating information - some code actually needs to be run. We have some static utilities which need to be initialized, but they don't know anything about Spark but are rather provided by external libraries. Thus, we need to actually trigger the initialization on all executors. The only other way that I see is to wrap all access to those external utilities with something on our side that is Spark-aware and initializes them if needed. But I think compared to this, our current solution is better.

          Show
          Skamandros Michael Schmeißer added a comment - No, it's not just about propagating information - some code actually needs to be run. We have some static utilities which need to be initialized, but they don't know anything about Spark but are rather provided by external libraries. Thus, we need to actually trigger the initialization on all executors. The only other way that I see is to wrap all access to those external utilities with something on our side that is Spark-aware and initializes them if needed. But I think compared to this, our current solution is better.
          Hide
          rdub Ryan Williams added a comment - - edited

          Both suggested workarounds here are lacking or broken / actively harmful, afaict, and the use case is real and valid.

          The ADAM project struggled for >2 years with this problem:

          mapPartitions hack

          Some attempts to set the field via a dummy mapPartitions job actually added pernicious, non-deterministic bugs.

          In general Spark seems to provide no guarantees that ≥1 tasks will get scheduled on each executor in such a situation:

          • in the above, node locality resulted in some executors being missed
          • dynamic-allocation also offers chances for executors to come online later and never be initialized

          object/singleton initialization

          How can one use singleton initialization to pass an object from the driver to each executor? Maybe I've missed this in the discussion above.

          In the end, ADAM decided to write the object to a file and route that file's path to the OutputFormat via a hadoop configuration value, which is pretty inelegant.

          Another use case

          I have another need for this atm where regular lazy-object-initialization is also insufficient: due to a rough-edge in Scala programs' classloader configuration, FileSystemProvider's in user JARs are not loaded properly.

          A workaround discussed in the 1st post on that issue fixes the problem, but needs to be run before FileSystemProvider.installedProviders is first called on the JVM, which can be triggered by numerous java.nio.file operations.

          I don't see a clear way to work in code in that will always lazily call my FileSystems.load function on each executor, let alone ensure that it happens before any code in the JAR calls e.g.
          Paths.get.

          Show
          rdub Ryan Williams added a comment - - edited Both suggested workarounds here are lacking or broken / actively harmful, afaict, and the use case is real and valid. The ADAM project struggled for >2 years with this problem: a 3rd-party OutputFormat required this field to be set the value of the field is computed on the driver, and needs to somehow be sent to and set in each executor JVM. mapPartitions hack Some attempts to set the field via a dummy mapPartitions job actually added pernicious, non-deterministic bugs . In general Spark seems to provide no guarantees that ≥1 tasks will get scheduled on each executor in such a situation: in the above, node locality resulted in some executors being missed dynamic-allocation also offers chances for executors to come online later and never be initialized object/singleton initialization How can one use singleton initialization to pass an object from the driver to each executor? Maybe I've missed this in the discussion above. In the end, ADAM decided to write the object to a file and route that file's path to the OutputFormat via a hadoop configuration value, which is pretty inelegant. Another use case I have another need for this atm where regular lazy-object-initialization is also insufficient: due to a rough-edge in Scala programs' classloader configuration, FileSystemProvider 's in user JARs are not loaded properly . A workaround discussed in the 1st post on that issue fixes the problem , but needs to be run before FileSystemProvider.installedProviders is first called on the JVM, which can be triggered by numerous java.nio.file operations. I don't see a clear way to work in code in that will always lazily call my FileSystems.load function on each executor, let alone ensure that it happens before any code in the JAR calls e.g. Paths.get .
          Hide
          mboes Mathieu Boespflug added a comment -

          Michael Schmeißer how did you manage to hook `JavaSerializer`? I tried doing so myself, by defining a new subclass, but then I need to make sure that new class is installed on all executors. Meaning I have to copy a .jar on all my nodes manually. For some reason Spark won't try looking for the serializer inside my application JAR.

          Show
          mboes Mathieu Boespflug added a comment - Michael Schmeißer how did you manage to hook `JavaSerializer`? I tried doing so myself, by defining a new subclass, but then I need to make sure that new class is installed on all executors. Meaning I have to copy a .jar on all my nodes manually. For some reason Spark won't try looking for the serializer inside my application JAR.
          Hide
          riteshtijoriwala Ritesh Tijoriwala added a comment -

          Michael Schmeißer - I would also like to know about hooking 'JavaSerializer'. I have a similar use case where I need to initialize set of objects/resources on each executor. I would also like to know if anybody has a way to hook into some "clean up" on each executor when 1) the executor shutdown 2) when a batch finishes and before next batch starts

          Show
          riteshtijoriwala Ritesh Tijoriwala added a comment - Michael Schmeißer - I would also like to know about hooking 'JavaSerializer'. I have a similar use case where I need to initialize set of objects/resources on each executor. I would also like to know if anybody has a way to hook into some "clean up" on each executor when 1) the executor shutdown 2) when a batch finishes and before next batch starts
          Hide
          Skamandros Michael Schmeißer added a comment - - edited

          In a nutshell, we have our own class "MySerializer" which is derived from org.apache.spark.serializer.JavaSerializer and performs our custom initialization in MySerializer#newInstance before calling the super method org.apache.spark.serializer.JavaSerializer#newInstance. Then, when building the SparkConf for initialization of the SparkContext, we add pSparkConf.set("spark.closure.serializer", MySerializer.class.getCanonicalName());.

          We package this with our application JAR and it works. So I think you have to look at your classpath configuration Mathieu Boespflug. In our case, the JAR which contains the closure serializer is listed in the following properties (we use Spark 1.5.0 on YARN in cluster mode):

          • driver.extraClassPath
          • executor.extraClassPath
          • yarn.secondary.jars
          • spark.yarn.secondary.jars
          • spark.driver.extraClassPath
          • spark.executor.extraClassPath

          If I recall it correctly, the variants without the "spark." prefix are produced by us because we prefix all of our properties with "spark." to transfer them via Oozie and unmask them again later, so you should only need the properties with the "spark." prefix.

          Regarding the questions of Ritesh Tijoriwala: 1) Please see the related issue SPARK-1107. 2) You can add a TaskCompletionListener with org.apache.spark.TaskContext#addTaskCompletionListener(org.apache.spark.util.TaskCompletionListener). To get the current TaskContext on the executor, just use org.apache.spark.TaskContext#get. We have some functionality to log the progress of a function in fixed intervals (e.g. every 1,000 records). To do this, you can use mapPartitions with a custom iterator.

          Show
          Skamandros Michael Schmeißer added a comment - - edited In a nutshell, we have our own class "MySerializer" which is derived from org.apache.spark.serializer.JavaSerializer and performs our custom initialization in MySerializer#newInstance before calling the super method org.apache.spark.serializer.JavaSerializer#newInstance . Then, when building the SparkConf for initialization of the SparkContext, we add pSparkConf.set("spark.closure.serializer", MySerializer.class.getCanonicalName()); . We package this with our application JAR and it works. So I think you have to look at your classpath configuration Mathieu Boespflug . In our case, the JAR which contains the closure serializer is listed in the following properties (we use Spark 1.5.0 on YARN in cluster mode): driver.extraClassPath executor.extraClassPath yarn.secondary.jars spark.yarn.secondary.jars spark.driver.extraClassPath spark.executor.extraClassPath If I recall it correctly, the variants without the "spark." prefix are produced by us because we prefix all of our properties with "spark." to transfer them via Oozie and unmask them again later, so you should only need the properties with the "spark." prefix. Regarding the questions of Ritesh Tijoriwala : 1) Please see the related issue SPARK-1107 . 2) You can add a TaskCompletionListener with org.apache.spark.TaskContext#addTaskCompletionListener(org.apache.spark.util.TaskCompletionListener) . To get the current TaskContext on the executor, just use org.apache.spark.TaskContext#get . We have some functionality to log the progress of a function in fixed intervals (e.g. every 1,000 records). To do this, you can use mapPartitions with a custom iterator.
          Hide
          riteshtijoriwala Ritesh Tijoriwala added a comment -

          Michael Schmeißer - Any similar tricks for spark 2.0.0? I see the config option to set the closure serializer has been removed - https://issues.apache.org/jira/browse/SPARK-12414. Currently we do "set of different things" to ensure our classes are loaded/instantiated before spark starts execution of its stages. It would be nice to consolidate this in one place/hook.

          Show
          riteshtijoriwala Ritesh Tijoriwala added a comment - Michael Schmeißer - Any similar tricks for spark 2.0.0? I see the config option to set the closure serializer has been removed - https://issues.apache.org/jira/browse/SPARK-12414 . Currently we do "set of different things" to ensure our classes are loaded/instantiated before spark starts execution of its stages. It would be nice to consolidate this in one place/hook.
          Hide
          Skamandros Michael Schmeißer added a comment -

          Ritesh Tijoriwala - Sorry, but I am not familiar with Spark 2.0.0 yet. But what I can say is that we have raised a Cloudera support case to address this issue so maybe we can expect some help from this side.

          Show
          Skamandros Michael Schmeißer added a comment - Ritesh Tijoriwala - Sorry, but I am not familiar with Spark 2.0.0 yet. But what I can say is that we have raised a Cloudera support case to address this issue so maybe we can expect some help from this side.
          Hide
          louisb@broadinstitute.org Louis Bergelson added a comment -

          I can't understand how people are dismissing this as not an issue. There are many cases where you need to initialize something on an executor, and many of them need input from the driver. All of the given workarounds are terrible hacks and at best force bad design, and at worst introduce confusing and non-deterministic bugs. Any time that the recommended solution to a common problem that many people are having is to abuse the Serializer in order to trick it into executing non-serialization code it seems obvious that there's a missing capability in the system.

          The fact that executors can come on and offline at any time during the run makes it especially essential that we have a robust way of initializing them. I just really don't understand the opposition to adding an initialization hook, it would solve so many problems in a clean way and doesn't seem like it would be particularly problematic on its own.

          Show
          louisb@broadinstitute.org Louis Bergelson added a comment - I can't understand how people are dismissing this as not an issue. There are many cases where you need to initialize something on an executor, and many of them need input from the driver. All of the given workarounds are terrible hacks and at best force bad design, and at worst introduce confusing and non-deterministic bugs. Any time that the recommended solution to a common problem that many people are having is to abuse the Serializer in order to trick it into executing non-serialization code it seems obvious that there's a missing capability in the system. The fact that executors can come on and offline at any time during the run makes it especially essential that we have a robust way of initializing them. I just really don't understand the opposition to adding an initialization hook, it would solve so many problems in a clean way and doesn't seem like it would be particularly problematic on its own.
          Hide
          srowen Sean Owen added a comment -

          I still don't see an argument against my primary suggestion: the singleton. The last comment on it just said, oh, how do you do it? it's quite possible. Nothing to do with the serializer.

          Show
          srowen Sean Owen added a comment - I still don't see an argument against my primary suggestion: the singleton. The last comment on it just said, oh, how do you do it? it's quite possible. Nothing to do with the serializer.
          Hide
          Skamandros Michael Schmeißer added a comment -

          Please see my comment from 05/Dec/16 12:39 and the following discussion - we are kind of going in circles here. I tried to explain the (real) problems we were facing as good as I can and which solution we applied to them and why other solutions have been dismissed. The fact is: There are numerous people here who seem to have the same issues and are glad to apply the workaround because "using the singleton" doesn't seem to provide a solution to them either. Probably we all don't understand how to do this but then again there seems to be something missing - at least documentation, doesn't it? What I can tell you in addition is that we have concerned experienced developers with the topic who have used quite a few singletons.

          Show
          Skamandros Michael Schmeißer added a comment - Please see my comment from 05/Dec/16 12:39 and the following discussion - we are kind of going in circles here. I tried to explain the (real) problems we were facing as good as I can and which solution we applied to them and why other solutions have been dismissed. The fact is: There are numerous people here who seem to have the same issues and are glad to apply the workaround because "using the singleton" doesn't seem to provide a solution to them either. Probably we all don't understand how to do this but then again there seems to be something missing - at least documentation, doesn't it? What I can tell you in addition is that we have concerned experienced developers with the topic who have used quite a few singletons.
          Hide
          srowen Sean Owen added a comment -

          Are you looking for an example of how it works? something like this, for what I assume is the common case of something like initializing a connection to an external resource:

          val config = ...
          df.mapPartitions { it =>
            MyResource.initIfNeeded(config)
            it.map(...)
          }
          
          ...
          
          object MyResource {
            private var initted = false
            def initIfNeeded(config: Config): Unit = this.synchronized {
              if (!initted) {
                initializeResource(config)
                initted = true
            }
          }
          

          If config is big, or tricky to pass around, that too can be read directly from a location, or wrapped up in some object in your code. It can actually be:

          df.mapPartitions { it =>
            MyResource.initIfNeeded()
            it.map(...)
          }
          
          ...
          
          object MyResource {
            private var initted = false
            def initIfNeeded(): Unit = this.synchronized {
              if (!initted) {
                val config = getConf()
                initializeResource(config)
                initted = true
            }
          }
          

          You get the idea. This is not a special technique, not even really singletons. Just making a method that executes the first time it's called and then does nothing after.
          If you don't like having to call initResource – call that in whatever code produces the resource connection or whatever.

          We can imagine objections and answers like this all day I'm sure. I think it covers all use cases I can imagine that a setup hook does, so the question is just is it easy enough? You're saying it's unusably hard, and proposing some hack on the serializer that sounds much more error-prone. I just cannot agree with this. This is much simpler than other solutions people are arguing against here, which I also think are too complex. Was it just a misunderstanding of the proposal?

          Louis Bergelson have you considered the implications of the semantics of a setup hook? for example, if setup fails on an executor, can you schedule a task that needed it? how do you track that? Here, the semantics are obvious.

          Show
          srowen Sean Owen added a comment - Are you looking for an example of how it works? something like this, for what I assume is the common case of something like initializing a connection to an external resource: val config = ... df.mapPartitions { it => MyResource.initIfNeeded(config) it.map(...) } ... object MyResource { private var initted = false def initIfNeeded(config: Config): Unit = this . synchronized { if (!initted) { initializeResource(config) initted = true } } If config is big, or tricky to pass around, that too can be read directly from a location, or wrapped up in some object in your code. It can actually be: df.mapPartitions { it => MyResource.initIfNeeded() it.map(...) } ... object MyResource { private var initted = false def initIfNeeded(): Unit = this . synchronized { if (!initted) { val config = getConf() initializeResource(config) initted = true } } You get the idea. This is not a special technique, not even really singletons. Just making a method that executes the first time it's called and then does nothing after. If you don't like having to call initResource – call that in whatever code produces the resource connection or whatever. We can imagine objections and answers like this all day I'm sure. I think it covers all use cases I can imagine that a setup hook does, so the question is just is it easy enough? You're saying it's unusably hard, and proposing some hack on the serializer that sounds much more error-prone. I just cannot agree with this. This is much simpler than other solutions people are arguing against here, which I also think are too complex. Was it just a misunderstanding of the proposal? Louis Bergelson have you considered the implications of the semantics of a setup hook? for example, if setup fails on an executor, can you schedule a task that needed it? how do you track that? Here, the semantics are obvious.
          Hide
          louisb@broadinstitute.org Louis Bergelson added a comment -

          Sean Owen Thanks for the reply and the example. Unfortunately, I still believe that the singleton approach doesn't work well for our use case.

          We don't have a single resource which needs initialization and can always be wrapped in a singleton. We have a sprawl of legacy dependencies that need to be initialized in certain ways before use, and then can be called into from literally hundreds of entry points. One of the things that needs initializing is the set of FileSystemProviders that Ryan Williams mentioned above. This has to be done before potentially any file access in our dependencies. It's implausible to wrap all of our library code into singleton objects and it's difficult to always call initResources() before every library call. It requires a lot of discipline on the part of the developers. Since we develop a framework for biologists to use to write tools, any thing that has to be enforced by convention isn't ideal and is likely to cause problems. People will forget to start their work by calling initResources() or worse, they'll remember to call initResources(), but only at the start of the first stage. Then they'll run into issues when executors die and are replaced during a later stage and the initialization doesn't run on the new executor.

          For something that could be cleanly wrapped in a singleton I agree that the semantics are obvious, but for the case where you're calling init() before running your code, the semantics are confusing and error prone.

          I'm sure there are complications from introducing a setup hook, but the one you mention seems simple enough to me. If a setup fails, that executor is killed and can't schedule tasks. There would probably have to be a mechanism for timing out after a certain number of failed executor starts, but I suspect that that exists already in some fashion for other sorts of failures.

          Show
          louisb@broadinstitute.org Louis Bergelson added a comment - Sean Owen Thanks for the reply and the example. Unfortunately, I still believe that the singleton approach doesn't work well for our use case. We don't have a single resource which needs initialization and can always be wrapped in a singleton. We have a sprawl of legacy dependencies that need to be initialized in certain ways before use, and then can be called into from literally hundreds of entry points. One of the things that needs initializing is the set of FileSystemProviders that Ryan Williams mentioned above. This has to be done before potentially any file access in our dependencies. It's implausible to wrap all of our library code into singleton objects and it's difficult to always call initResources() before every library call. It requires a lot of discipline on the part of the developers. Since we develop a framework for biologists to use to write tools, any thing that has to be enforced by convention isn't ideal and is likely to cause problems. People will forget to start their work by calling initResources() or worse, they'll remember to call initResources(), but only at the start of the first stage. Then they'll run into issues when executors die and are replaced during a later stage and the initialization doesn't run on the new executor. For something that could be cleanly wrapped in a singleton I agree that the semantics are obvious, but for the case where you're calling init() before running your code, the semantics are confusing and error prone. I'm sure there are complications from introducing a setup hook, but the one you mention seems simple enough to me. If a setup fails, that executor is killed and can't schedule tasks. There would probably have to be a mechanism for timing out after a certain number of failed executor starts, but I suspect that that exists already in some fashion for other sorts of failures.
          Hide
          srowen Sean Owen added a comment -

          I can also imagine cases involving legacy code that make this approach hard to implement. Still, it's possible with enough 'discipline', but this is true of wrangling any legacy code. I don't think the question of semantics is fully appreciated here. Is killing the app's other tasks on the same executor reasonable behavior? how many failures are allowed by default by this new mechanism? what do you do if init never returns? for how long? Are you willing to reschedule the task on another executor? how does it interact with locality? I know, any change raises questions, but this one raises a lot.

          It's a conceptual change in Spark and I'm just sure it's not going to happen 3 years in. Tasks have never had special status or lifecycle w.r.t. executors and that's a positive thing, really.

          Show
          srowen Sean Owen added a comment - I can also imagine cases involving legacy code that make this approach hard to implement. Still, it's possible with enough 'discipline', but this is true of wrangling any legacy code. I don't think the question of semantics is fully appreciated here. Is killing the app's other tasks on the same executor reasonable behavior? how many failures are allowed by default by this new mechanism? what do you do if init never returns? for how long? Are you willing to reschedule the task on another executor? how does it interact with locality? I know, any change raises questions, but this one raises a lot. It's a conceptual change in Spark and I'm just sure it's not going to happen 3 years in. Tasks have never had special status or lifecycle w.r.t. executors and that's a positive thing, really.
          Hide
          hn5092 yiming.xu added a comment -

          I need a hook too. Some case, We need init something like spring initbean

          Show
          hn5092 yiming.xu added a comment - I need a hook too. Some case, We need init something like spring initbean

            People

            • Assignee:
              Unassigned
              Reporter:
              matei Matei Zaharia
            • Votes:
              14 Vote for this issue
              Watchers:
              27 Start watching this issue

              Dates

              • Created:
                Updated:

                Development