Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-3129

Prevent data loss in Spark Streaming on driver failure using Write Ahead Logs

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.0.0, 1.0.1, 1.0.2, 1.0.3, 1.1.0
    • Fix Version/s: 1.2.0
    • Component/s: DStreams
    • Labels:
      None
    • Target Version/s:

      Description

      Spark Streaming can small amounts of data when the driver goes down - and the sending system cannot re-send the data (or the data has already expired on the sender side). This currently affects all receivers.

      The solution we propose is to reliably store all the received data into HDFS. This will allow the data to persist through driver failures, and therefore can be processed when the driver gets restarted.

      The high level design doc for this feature is given here.
      https://docs.google.com/document/d/1vTCB5qVfyxQPlHuv8rit9-zjdttlgaSrMgfCDQlCJIM/edit?usp=sharing

      This major task has been divided in sub-tasks

      • Implementing a write ahead log management system that can manage rolling write ahead logs - write to log, recover on failure and clean up old logs
      • Implementing a HDFS backed block RDD that can read data either from Spark's BlockManager or from HDFS files
      • Implementing a ReceivedBlockHandler interface that abstracts out the functionality of saving received blocks
      • Implementing a ReceivedBlockTracker and other associated changes in the driver that allows metadata of received blocks and block-to-batch allocations to be recovered upon driver retart
      1. SecurityFix.diff
        1 kB
        Hari Shreedharan

        Issue Links

          Activity

          Hide
          hshreedharan Hari Shreedharan added a comment -

          This doc is an early list of fixes. I may have missed some, and/or they may be better ways to do this. Please post any feedback you have! Thanks!

          Show
          hshreedharan Hari Shreedharan added a comment - This doc is an early list of fixes. I may have missed some, and/or they may be better ways to do this. Please post any feedback you have! Thanks!
          Hide
          tgraves Thomas Graves added a comment -

          A couple of random thoughts on this for yarn. yarn added this ability in 2.4.0 and you have to tell it you want it in the application submission context. So you will have to handle other versions of yarn properly where its not supported.
          I believe yarn will tell you what nodes you have containers already running on but you'll have to figure out details about ports, etc. I haven't looked at all the specifics.

          You'll have to figure out how to do authentication properly. This gets forgotten about many times.

          I think we should flush out more of the high level design concerns between yarn/standalone/mesos and on yarn the client/cluster modes.

          Show
          tgraves Thomas Graves added a comment - A couple of random thoughts on this for yarn. yarn added this ability in 2.4.0 and you have to tell it you want it in the application submission context. So you will have to handle other versions of yarn properly where its not supported. I believe yarn will tell you what nodes you have containers already running on but you'll have to figure out details about ports, etc. I haven't looked at all the specifics. You'll have to figure out how to do authentication properly. This gets forgotten about many times. I think we should flush out more of the high level design concerns between yarn/standalone/mesos and on yarn the client/cluster modes.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          The way the driver "finds" the executors would be common for all the scheduling systems (it should really be independent of the scheduling/deployment). I agree about the auth part too.

          Tathagata Das mentioned there is something similar already in standalone. I'd like to concentrate on YARN - if someone else is interested in Mesos please feel free to take it up!

          I posted an initial patch for Client mode to simply keep the executors around (though it is not exposed via SparkSubmit which we can do once we can get the whole series of patches in).

          For YARN mode, does that mean the method calls have to be via reflection? I'd assume so.

          The reason I mentioned doing it via HDFS and then pinging the executors is to make it independent of YARN/Mesos/Standalone - we can just do it via StreamingContext and make it completely independent of the backend on which Spark is running (I am not even sure this should be a valid option for non-streaming cases, as it does not really add any value elsewhere).

          Show
          hshreedharan Hari Shreedharan added a comment - The way the driver "finds" the executors would be common for all the scheduling systems (it should really be independent of the scheduling/deployment). I agree about the auth part too. Tathagata Das mentioned there is something similar already in standalone. I'd like to concentrate on YARN - if someone else is interested in Mesos please feel free to take it up! I posted an initial patch for Client mode to simply keep the executors around (though it is not exposed via SparkSubmit which we can do once we can get the whole series of patches in). For YARN mode, does that mean the method calls have to be via reflection? I'd assume so. The reason I mentioned doing it via HDFS and then pinging the executors is to make it independent of YARN/Mesos/Standalone - we can just do it via StreamingContext and make it completely independent of the backend on which Spark is running (I am not even sure this should be a valid option for non-streaming cases, as it does not really add any value elsewhere).
          Hide
          tgraves Thomas Graves added a comment -

          Yes that probably means using reflection.

          I think having a file based one makes sense so we don't have other dependencies if you don't need them. You can always make it more complex and use zookeeper for those who want to install it. For yarn you could save it in the .sparkStaging directories along with the application jars that way it knows where to find it.

          You still have the question of how authentication works. This would require either the secret key being stored somewhere in hdfs also (and protected) or some other way for executors to allow connections and figure out this is a restart.

          Show
          tgraves Thomas Graves added a comment - Yes that probably means using reflection. I think having a file based one makes sense so we don't have other dependencies if you don't need them. You can always make it more complex and use zookeeper for those who want to install it. For yarn you could save it in the .sparkStaging directories along with the application jars that way it knows where to find it. You still have the question of how authentication works. This would require either the secret key being stored somewhere in hdfs also (and protected) or some other way for executors to allow connections and figure out this is a restart.
          Hide
          jerryshao Saisai Shao added a comment -

          Hi Hari, I have some high level questions about this:

          1. In the design doc, you mentioned to do "Once the RDD is generated, the RDD is checkpointed to HDFS - at which point it is fully
          recoverable", I'm not sure you checkpoint only the metadata of RDD or also about the data? I think RDD checkpointing is little expensive for each batch duration if the batch duration is quite short.
          2. If we keep executors alive when driver dies, do we still need to keep receivers to receive data from external source? If so I think there may potentially have some problems: firstly memory usage will be accumulated since no data is consumed; secondly when driver comes back how to balance the data processing priority, since old data needs to be processed first, this will delay the newly coming data processing time and lead to unwanted issue if latency is larger than the batch duration.
          3. In some scenarios we need to operate DStream with RDD (like join real-time data with history log), normally RDD is cached in BM's memory, I think we also need to recover this RDD's metadata, not only streaming data if we need to recover the processing.

          Maybe there are many other details we need to think about, because to do driver HA is quite complex. Please correct me if something is misunderstood. Thanks a lot.

          Show
          jerryshao Saisai Shao added a comment - Hi Hari, I have some high level questions about this: 1. In the design doc, you mentioned to do "Once the RDD is generated, the RDD is checkpointed to HDFS - at which point it is fully recoverable", I'm not sure you checkpoint only the metadata of RDD or also about the data? I think RDD checkpointing is little expensive for each batch duration if the batch duration is quite short. 2. If we keep executors alive when driver dies, do we still need to keep receivers to receive data from external source? If so I think there may potentially have some problems: firstly memory usage will be accumulated since no data is consumed; secondly when driver comes back how to balance the data processing priority, since old data needs to be processed first, this will delay the newly coming data processing time and lead to unwanted issue if latency is larger than the batch duration. 3. In some scenarios we need to operate DStream with RDD (like join real-time data with history log), normally RDD is cached in BM's memory, I think we also need to recover this RDD's metadata, not only streaming data if we need to recover the processing. Maybe there are many other details we need to think about, because to do driver HA is quite complex. Please correct me if something is misunderstood. Thanks a lot.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Thomas Graves - Thanks for the pointers. Yes, using HDFS also allows us to use the same file with some protection to store the keys. This is something that might some design and discussion first.

          I will also update the PR with the reflection code.

          Saisai Shao:
          1. Today RDDs already get checkpointed at the end of every job when the runJob method gets called. Nothing is changing here. The entire graph does get checkpointed today already.
          2. No, this is something that will need to be taken care of. When the driver dies, blocks can no longer be batched into RDDs - which means generating blocks without the driver makes no sense. Also, when the driver comes back online, new receivers get created, which would start receiving the data now. The only reason the executors are being kept around is to get the data in their memory - any processing/receiving should be killed.
          3. Since it is an RDD, there is nothing that stops it from being recovered, right? It is recovered by the usual method of regenerating it. Only DStream data that has not been converted into an RDD is really lost - so getting the RDD back should not be a concern at all (of course, the cache is gone, but it can get pulled back into cache once the driver comes back up).

          Show
          hshreedharan Hari Shreedharan added a comment - Thomas Graves - Thanks for the pointers. Yes, using HDFS also allows us to use the same file with some protection to store the keys. This is something that might some design and discussion first. I will also update the PR with the reflection code. Saisai Shao : 1. Today RDDs already get checkpointed at the end of every job when the runJob method gets called. Nothing is changing here. The entire graph does get checkpointed today already. 2. No, this is something that will need to be taken care of. When the driver dies, blocks can no longer be batched into RDDs - which means generating blocks without the driver makes no sense. Also, when the driver comes back online, new receivers get created, which would start receiving the data now. The only reason the executors are being kept around is to get the data in their memory - any processing/receiving should be killed. 3. Since it is an RDD, there is nothing that stops it from being recovered, right? It is recovered by the usual method of regenerating it. Only DStream data that has not been converted into an RDD is really lost - so getting the RDD back should not be a concern at all (of course, the cache is gone, but it can get pulled back into cache once the driver comes back up).
          Hide
          jerryshao Saisai Shao added a comment -

          Hi Hari Shreedharan, one more question:

          Is your design goal trying to fix the receiver node failure caused data loss issue? Seems potentially data will be lost when data is only stored in BlockGenerator not yet in BM when node is failed. Your design doc mainly focused on driver failure, so what's your thought?

          Show
          jerryshao Saisai Shao added a comment - Hi Hari Shreedharan , one more question: Is your design goal trying to fix the receiver node failure caused data loss issue? Seems potentially data will be lost when data is only stored in BlockGenerator not yet in BM when node is failed. Your design doc mainly focused on driver failure, so what's your thought?
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Yes, so my initial goal is to be able to recover all the blocks that have not been made into an RDD yet (at which point it would be safe). There is data which may not have become a block yet (which are created using the += operator) - for now, I am going to call it fair game to say that we are going to be adding storeReliably(ArrayBuffer/Iterable) methods which are the only ones that store data such that they are guaranteed to be recovered.

          At a later stage, we could use something like a WAL on HDFS to recover even the += data, though that would affect performance.

          Show
          hshreedharan Hari Shreedharan added a comment - Yes, so my initial goal is to be able to recover all the blocks that have not been made into an RDD yet (at which point it would be safe). There is data which may not have become a block yet (which are created using the += operator) - for now, I am going to call it fair game to say that we are going to be adding storeReliably(ArrayBuffer/Iterable) methods which are the only ones that store data such that they are guaranteed to be recovered. At a later stage, we could use something like a WAL on HDFS to recover even the += data, though that would affect performance.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Thomas Graves - Am I correct in assuming that using Akka automatically gives the shared secret authentication if spark.authenticate is set to true - if the AM is restarted by YARN itself (since it is the same application, it theoretically has access to the same shared secret and thus should be able to communicate via Akka)?

          Show
          hshreedharan Hari Shreedharan added a comment - Thomas Graves - Am I correct in assuming that using Akka automatically gives the shared secret authentication if spark.authenticate is set to true - if the AM is restarted by YARN itself (since it is the same application, it theoretically has access to the same shared secret and thus should be able to communicate via Akka)?
          Hide
          tgraves Thomas Graves added a comment -

          On yarn, it generates the secret automatically. In cluster mode, it does it in the applicationMaster. Since it generates it in the applicationmaster, it goes away when the application master dies. If the secret was generated on the client side and populated into the credentials in the UGI similar to how we do tokens then a restart of the AM in cluster mode should be able to pick it back up.

          This won't work for client mode though since the client/spark driver wouldn't have a way to get ahold of the UGI again.

          Show
          tgraves Thomas Graves added a comment - On yarn, it generates the secret automatically. In cluster mode, it does it in the applicationMaster. Since it generates it in the applicationmaster, it goes away when the application master dies. If the secret was generated on the client side and populated into the credentials in the UGI similar to how we do tokens then a restart of the AM in cluster mode should be able to pick it back up. This won't work for client mode though since the client/spark driver wouldn't have a way to get ahold of the UGI again.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          I am less worried about client mode, since most streaming applications would run in cluster mode. We can make this available only in the cluster mode.

          Show
          hshreedharan Hari Shreedharan added a comment - I am less worried about client mode, since most streaming applications would run in cluster mode. We can make this available only in the cluster mode.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Looks like simply moving the code that generates the secret and sets in the UGI to the Client class should take care of that.

          Show
          hshreedharan Hari Shreedharan added a comment - Looks like simply moving the code that generates the secret and sets in the UGI to the Client class should take care of that.
          Hide
          jerryshao Saisai Shao added a comment -

          Hi Hari Shreedharan], thanks for your reply, is this PR (https://github.com/apache/spark/pull/1195) the one you mentioned about storeReliably()?

          According to my knowledge, this API aims to store bunch of messages into BM directly to make it reliable, but for some receiver like Kafka, socket and others, data is injected one by one message, we can't call storeReliably() each time because of efficiency and throughput concern, so we need to store these data locally to some amount, and then flush to BM using storeReliably(). So I think data will potentially be lost as we store it locally. These days I thought about WAL things, IMHO i think WAL would be a better solution compared to blocked store API.

          Show
          jerryshao Saisai Shao added a comment - Hi Hari Shreedharan ], thanks for your reply, is this PR ( https://github.com/apache/spark/pull/1195 ) the one you mentioned about storeReliably()? According to my knowledge, this API aims to store bunch of messages into BM directly to make it reliable, but for some receiver like Kafka, socket and others, data is injected one by one message, we can't call storeReliably() each time because of efficiency and throughput concern, so we need to store these data locally to some amount, and then flush to BM using storeReliably(). So I think data will potentially be lost as we store it locally. These days I thought about WAL things, IMHO i think WAL would be a better solution compared to blocked store API.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Hi Saisai,

          You are correct that there would be a latency increase, but that is a cost to be paid for reliability. I want to get at least the first part (storeReliably or equivalent right) before going into the WAL implementation.

          Show
          hshreedharan Hari Shreedharan added a comment - Hi Saisai, You are correct that there would be a latency increase, but that is a cost to be paid for reliability. I want to get at least the first part (storeReliably or equivalent right) before going into the WAL implementation.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Thomas Graves - It looks like the SecurityManager class already persists the key to the UGI when the AM starts up the first time. A restarted AM would be able to get the key from the UGI anyway (that is true even today - where the AM has access to the key on restart anyway) - correct? So I don't know if there is a need to change anything in the security model (unless I am missing something less obvious)

          Show
          hshreedharan Hari Shreedharan added a comment - Thomas Graves - It looks like the SecurityManager class already persists the key to the UGI when the AM starts up the first time. A restarted AM would be able to get the key from the UGI anyway (that is true even today - where the AM has access to the key on restart anyway) - correct? So I don't know if there is a need to change anything in the security model (unless I am missing something less obvious)
          Hide
          hshreedharan Hari Shreedharan added a comment -

          (I am not too familiar with how UGI gets passed around if it does at all)

          Show
          hshreedharan Hari Shreedharan added a comment - (I am not too familiar with how UGI gets passed around if it does at all)
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Correct me if I am wrong here, it looks like what I'd need to do is:

          • Create the key, add it to credentials in the client
          • Then these credentials get written out in the setUpSecurityToken method.

          When the AM restarts it has access to these credentials once again (and they get shipped to the executors when they are started by the AM).

          How I wish Hadoop security model was simpler

          Show
          hshreedharan Hari Shreedharan added a comment - Correct me if I am wrong here, it looks like what I'd need to do is: Create the key, add it to credentials in the client Then these credentials get written out in the setUpSecurityToken method. When the AM restarts it has access to these credentials once again (and they get shipped to the executors when they are started by the AM). How I wish Hadoop security model was simpler
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Looks like this should be enough, correct?

          Since the SecurityManager constructor seems to generate and set the secret to the credentials which then gets written out to the ContainerLaunchContext. This should make sure that a restarting AM has the same credentials.

          Show
          hshreedharan Hari Shreedharan added a comment - Looks like this should be enough, correct? Since the SecurityManager constructor seems to generate and set the secret to the credentials which then gets written out to the ContainerLaunchContext. This should make sure that a restarting AM has the same credentials.
          Hide
          tgraves Thomas Graves added a comment -

          yes that should be enough.

          Show
          tgraves Thomas Graves added a comment - yes that should be enough.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          FYI here is the branch where I am doing development on this: https://github.com/harishreedharan/spark/tree/streaming-ha

          Off topic, in Intellij, is there a way to get the yarn/stable stuff to recognize their base classes in common so we can get autocomplete and syntax highlighting (even type awareness) to work properly?

          Show
          hshreedharan Hari Shreedharan added a comment - FYI here is the branch where I am doing development on this: https://github.com/harishreedharan/spark/tree/streaming-ha Off topic, in Intellij, is there a way to get the yarn/stable stuff to recognize their base classes in common so we can get autocomplete and syntax highlighting (even type awareness) to work properly?
          Hide
          srowen Sean Owen added a comment -

          Hari Shreedharan Just manually add the src dir in the parent to the module in IntelliJ. It'd be cooler if it was automatic, but not hard. There have been fixes proposed but I assume this is likely to go away as a problem only when yarn-alpha goes away.

          Show
          srowen Sean Owen added a comment - Hari Shreedharan Just manually add the src dir in the parent to the module in IntelliJ. It'd be cooler if it was automatic, but not hard. There have been fixes proposed but I assume this is likely to go away as a problem only when yarn-alpha goes away.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Sean Owen Thanks! That fixed the issue! That saved me a whole lot of time!

          Show
          hshreedharan Hari Shreedharan added a comment - Sean Owen Thanks! That fixed the issue! That saved me a whole lot of time!
          Hide
          hshreedharan Hari Shreedharan added a comment -

          It looks like Akka makes it difficult to connect back to a client (in this case a BlockManagerSlaveActor) from a new server (in this case, BlockManagerMasterActor). Since ActorRefs are serializable, I am going to actually serialize the ActorRef to BlockManagerSlaveActor to the HDFS location rather their locations - so we can simply startup from that to connect to the slaves.

          Show
          hshreedharan Hari Shreedharan added a comment - It looks like Akka makes it difficult to connect back to a client (in this case a BlockManagerSlaveActor) from a new server (in this case, BlockManagerMasterActor). Since ActorRefs are serializable, I am going to actually serialize the ActorRef to BlockManagerSlaveActor to the HDFS location rather their locations - so we can simply startup from that to connect to the slaves.
          Hide
          pwendell Patrick Wendell added a comment -

          I think for this it's worth considering a design that solves H/A using simpler mechanisms (for instance, adding a write-ahead-log for received data). Also, with this proposal, what happens if both a driver and an executor fail at the same time?

          Show
          pwendell Patrick Wendell added a comment - I think for this it's worth considering a design that solves H/A using simpler mechanisms (for instance, adding a write-ahead-log for received data). Also, with this proposal, what happens if both a driver and an executor fail at the same time?
          Hide
          hshreedharan Hari Shreedharan added a comment -

          As long as at least one executor containing every block needs to be available when the driver comes back up - that is pretty much it. Since each block is replicated unless all 3 executors holding a block fails, it will not lose data.

          A WAL would be necessary to recover data which have not been pushed as blocks yet (look at the store(Any) method). Adding a persistent WAL is going to hit performance, especially if the WAL has to be durable (you'd need to do an hflush if the WAL is on HDFS, or its equivalent on any other system). So you'd be paying for persisting the data when each block is created, whereas in this case, you are paying only at startup and driver restarts. Even the amount of data transferred is very less, since it is just metadata. If the WAL is not durable, then there is no guarantee it would be recoverable. If the WAL is local to each executor somehow, you'd still have to send all the block info to the driver when it comes back up.

          TD and I had discussed the WAL approach and felt it is actually more complex and might affect performance more than this one. In this case, all the building blocks are already there (since we already know how to get block infos from executors which hold on to the blocks). We just need to add Akka messages to ask the executors to re-send block metadata.

          Show
          hshreedharan Hari Shreedharan added a comment - As long as at least one executor containing every block needs to be available when the driver comes back up - that is pretty much it. Since each block is replicated unless all 3 executors holding a block fails, it will not lose data. A WAL would be necessary to recover data which have not been pushed as blocks yet (look at the store(Any) method). Adding a persistent WAL is going to hit performance, especially if the WAL has to be durable (you'd need to do an hflush if the WAL is on HDFS, or its equivalent on any other system). So you'd be paying for persisting the data when each block is created, whereas in this case, you are paying only at startup and driver restarts. Even the amount of data transferred is very less, since it is just metadata. If the WAL is not durable, then there is no guarantee it would be recoverable. If the WAL is local to each executor somehow, you'd still have to send all the block info to the driver when it comes back up. TD and I had discussed the WAL approach and felt it is actually more complex and might affect performance more than this one. In this case, all the building blocks are already there (since we already know how to get block infos from executors which hold on to the blocks). We just need to add Akka messages to ask the executors to re-send block metadata.
          Hide
          matei Matei Zaharia added a comment -

          Hari, have you actually benchmarked a WAL based on HDFS? Recently we've discovered a number of bugs with block replication in Spark, and this plus the complexity of making executors reconnect make the WAL a much more attractive design short-term. I don't know if you have a more detailed design doc, but the work for reconnecting executors is quite a bit more involved than what the doc here suggests. For example, you need to make sure that the new driver uses a distinct set of RDD IDs, shuffle IDs, block IDs, etc from the old one, and you need to make sure that executors find the newest driver at all times (e.g. if one restarts and then immediately fails). I actually implemented a prototype of it when we were working on Spark Streaming, but I never pushed it into mainline Spark because of these issues.

          Longer-term, I hope that a lot of this issue will be handled by better treatment of reliable input sources, in particular Kafka. If we were able to replay lost data from Kafka nicely (which is hard with its current low-level API, but will hopefully become easy later), people would have a reliable real-time source to get data from, in addition to the higher-latency source currently available in HDFS. Then we would only need this WAL for other data sources, such as Twitter, where the source is not reliable, and the pressure on it for throughput would be much lower.

          Show
          matei Matei Zaharia added a comment - Hari, have you actually benchmarked a WAL based on HDFS? Recently we've discovered a number of bugs with block replication in Spark, and this plus the complexity of making executors reconnect make the WAL a much more attractive design short-term. I don't know if you have a more detailed design doc, but the work for reconnecting executors is quite a bit more involved than what the doc here suggests. For example, you need to make sure that the new driver uses a distinct set of RDD IDs, shuffle IDs, block IDs, etc from the old one, and you need to make sure that executors find the newest driver at all times (e.g. if one restarts and then immediately fails). I actually implemented a prototype of it when we were working on Spark Streaming, but I never pushed it into mainline Spark because of these issues. Longer-term, I hope that a lot of this issue will be handled by better treatment of reliable input sources, in particular Kafka. If we were able to replay lost data from Kafka nicely (which is hard with its current low-level API, but will hopefully become easy later), people would have a reliable real-time source to get data from, in addition to the higher-latency source currently available in HDFS. Then we would only need this WAL for other data sources, such as Twitter, where the source is not reliable, and the pressure on it for throughput would be much lower.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Thanks Matei for the background. I had considered some of the factors (like executors always talking to the latest ones) - but I was not aware of the distinct RDD ids etc.

          TD and I discussed this offline and we agreed that the WAL would probably be the best way to go. I am planning to do some benchmarking of appending data to a 5-node HDFS cluster on EC2 today. Considering that HBase does use a WAL on HDFS, my expectation is that the perf should be reasonable.

          I will post the application on github and post a link here. I will run the application and see how it goes. I will also post it here.

          Show
          hshreedharan Hari Shreedharan added a comment - Thanks Matei for the background. I had considered some of the factors (like executors always talking to the latest ones) - but I was not aware of the distinct RDD ids etc. TD and I discussed this offline and we agreed that the WAL would probably be the best way to go. I am planning to do some benchmarking of appending data to a 5-node HDFS cluster on EC2 today. Considering that HBase does use a WAL on HDFS, my expectation is that the perf should be reasonable. I will post the application on github and post a link here. I will run the application and see how it goes. I will also post it here.
          Hide
          matei Matei Zaharia added a comment -

          Great, it will be nice to see how fast this is. I also think the rate per node doesn't need to be enormous for this to be useful, since we can also parallelize receiving over multiple nodes.

          Show
          matei Matei Zaharia added a comment - Great, it will be nice to see how fast this is. I also think the rate per node doesn't need to be enormous for this to be useful, since we can also parallelize receiving over multiple nodes.
          Hide
          jerryshao Saisai Shao added a comment -

          Strongly agree with Matei's comment, I think we can refer Storm's design to rollup the lost message to Receivers or others, and then we can replay this lost message if external sources support this replay feature, like Kafka or others. WAL would be another option for unreliable sources, the throughput and reliability can be balanced by user to open WAL trigger.

          Show
          jerryshao Saisai Shao added a comment - Strongly agree with Matei's comment, I think we can refer Storm's design to rollup the lost message to Receivers or others, and then we can replay this lost message if external sources support this replay feature, like Kafka or others. WAL would be another option for unreliable sources, the throughput and reliability can be balanced by user to open WAL trigger.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          So I did some benchmarking on EC2, writing to files one after another, with a 200ms gap between hflushes. In millis, times for hflush:

          Writes for stream 1: 30,61,75,89,68,4,65,59,92,261,3,66,86,81,96,4,75,64,79,82,69,2,91,69,65,80
          Writes for stream 2: 58,65,68,75,4,79,89,110,73,76,3,66,74,70,111,3,80,132,97,72,120,2,182,91,70,62
          Writes for stream 3: 68,74,79,67,4,67,82,97,109,3,104,56,65,81,3,57,61,57,2,76,61,59,62
          Writes for stream 4: 94,88,93,82,4,116,89,74,66,3,61,79,73,70,3,68,83,106,70,3,73,70,71,76
          Writes for stream 5: 66,67,83,63,3,70,77,110,80,69,3,83,75,67,65,4,73,70,97,2,56,63,79,105
          Writes for stream 6: 62,68,62,69,3,64,61,72,3,72,62,76,72,4,58,138,77,66,1,62,93,71,107
          Writes for stream 7: 82,63,94,80,4,121,117,69,74,3,80,70,66,63,3,59,69,68,70,1,59,130,75,96
          Writes for stream 8: 93,80,269,66,4,73,106,95,143,3,90,72,65,89,3,62,75,65,82,2,76,57,68,108
          Writes for stream 9: 132,75,59,78,4,70,66,66,71,3,60,75,89,4,78,84,76,74,1,73,63,67,88
          Writes for stream 10: 80,70,95,76,3,145,146,85,101,4,157,83,70,82,4,72,73,159,121,3,92,82,69,74


          Here is the code I used to benchmark: https://github.com/harishreedharan/hdfs-benchmark

          You can run it using a command line that looks like:

          mvn test -Dpath=hdfs://xy.example.com/data/op -DbufferSize=1024 -Dtotal=100000 
          

          Time in between flushes defaults to 200ms, which can be set using -DflushInterval=500 (in millis).

          In most cases, an hflush takes between 50 and 100 ms, which seems pretty ok (there are outliers). This though is a little bit flakey, since I was running on EC2 - not on physical boxes.

          I'd prefer the WAL option, to not tightly couple Spark's reliability with Kafka. We should make it pluggable - so we can replace the WAL option with something that uses Kafka information if Kafka is being used.

          Show
          hshreedharan Hari Shreedharan added a comment - So I did some benchmarking on EC2, writing to files one after another, with a 200ms gap between hflushes. In millis, times for hflush: Writes for stream 1: 30,61,75,89,68,4,65,59,92,261,3,66,86,81,96,4,75,64,79,82,69,2,91,69,65,80 Writes for stream 2: 58,65,68,75,4,79,89,110,73,76,3,66,74,70,111,3,80,132,97,72,120,2,182,91,70,62 Writes for stream 3: 68,74,79,67,4,67,82,97,109,3,104,56,65,81,3,57,61,57,2,76,61,59,62 Writes for stream 4: 94,88,93,82,4,116,89,74,66,3,61,79,73,70,3,68,83,106,70,3,73,70,71,76 Writes for stream 5: 66,67,83,63,3,70,77,110,80,69,3,83,75,67,65,4,73,70,97,2,56,63,79,105 Writes for stream 6: 62,68,62,69,3,64,61,72,3,72,62,76,72,4,58,138,77,66,1,62,93,71,107 Writes for stream 7: 82,63,94,80,4,121,117,69,74,3,80,70,66,63,3,59,69,68,70,1,59,130,75,96 Writes for stream 8: 93,80,269,66,4,73,106,95,143,3,90,72,65,89,3,62,75,65,82,2,76,57,68,108 Writes for stream 9: 132,75,59,78,4,70,66,66,71,3,60,75,89,4,78,84,76,74,1,73,63,67,88 Writes for stream 10: 80,70,95,76,3,145,146,85,101,4,157,83,70,82,4,72,73,159,121,3,92,82,69,74 Here is the code I used to benchmark: https://github.com/harishreedharan/hdfs-benchmark You can run it using a command line that looks like: mvn test -Dpath=hdfs: //xy.example.com/data/op -DbufferSize=1024 -Dtotal=100000 Time in between flushes defaults to 200ms, which can be set using -DflushInterval=500 (in millis). In most cases, an hflush takes between 50 and 100 ms, which seems pretty ok (there are outliers). This though is a little bit flakey, since I was running on EC2 - not on physical boxes. I'd prefer the WAL option, to not tightly couple Spark's reliability with Kafka. We should make it pluggable - so we can replace the WAL option with something that uses Kafka information if Kafka is being used.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Reducing the buffer size decreases the number of hflushes per file (total time taken per file is less), but each hflush takes longer as more data is buffered locally (I guess).

          Show
          hshreedharan Hari Shreedharan added a comment - Reducing the buffer size decreases the number of hflushes per file (total time taken per file is less), but each hflush takes longer as more data is buffered locally (I guess).
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Do these numbers look ok enough to you guys, Tathagata Das, Matei Zaharia, Patrick Wendell? If you want to experiment, it is you can use the app and play around with it. I don't think this number is too bad - though I don't know how the current block replication code performs - but I'd estimate it to be in a few 10's of millis.

          Show
          hshreedharan Hari Shreedharan added a comment - Do these numbers look ok enough to you guys, Tathagata Das , Matei Zaharia , Patrick Wendell ? If you want to experiment, it is you can use the app and play around with it. I don't think this number is too bad - though I don't know how the current block replication code performs - but I'd estimate it to be in a few 10's of millis.
          Hide
          matei Matei Zaharia added a comment -

          So Hari, what is the maximum sustainable rate in MB/second? That's the number we should be looking for. I think a latency of 50-100 ms to flush is fine, but we can't be writing just 5 Kbytes/second.

          Show
          matei Matei Zaharia added a comment - So Hari, what is the maximum sustainable rate in MB/second? That's the number we should be looking for. I think a latency of 50-100 ms to flush is fine, but we can't be writing just 5 Kbytes/second.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          I did multiple rounds of testing and it looks like on average total rate for writing and flushing is around 100 MB/s. There are a couple of outliers, but that is likely due to flakey networking on EC2. Barring the one outlier, the least I got was 79 MB/s and max was 142 MB/s, but most were near 100.

          Show
          hshreedharan Hari Shreedharan added a comment - I did multiple rounds of testing and it looks like on average total rate for writing and flushing is around 100 MB/s. There are a couple of outliers, but that is likely due to flakey networking on EC2. Barring the one outlier, the least I got was 79 MB/s and max was 142 MB/s, but most were near 100.
          Hide
          matei Matei Zaharia added a comment - - edited

          Is that 100 MB/s per node or in total? That should be pretty good for per-node if it scales well to a cluster.

          Show
          matei Matei Zaharia added a comment - - edited Is that 100 MB/s per node or in total? That should be pretty good for per-node if it scales well to a cluster.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          It is per node, single threaded.

          Show
          hshreedharan Hari Shreedharan added a comment - It is per node, single threaded.
          Hide
          matei Matei Zaharia added a comment -

          Alright, in that case, this sounds pretty good to me. I would go ahead with this version. Please coordinate with Tathagata Das as well since he's been looking into this.

          Show
          matei Matei Zaharia added a comment - Alright, in that case, this sounds pretty good to me. I would go ahead with this version. Please coordinate with Tathagata Das as well since he's been looking into this.
          Hide
          hshreedharan Hari Shreedharan added a comment -

          Sure. Thanks Matei!

          Show
          hshreedharan Hari Shreedharan added a comment - Sure. Thanks Matei!
          Hide
          tdas Tathagata Das added a comment -

          I am marking this as fixed, as all non-test related issues have been merged. The one sub-task left is related to unit-tests that uses the WAL to do end-to-end tests and verify no data loss.

          Show
          tdas Tathagata Das added a comment - I am marking this as fixed, as all non-test related issues have been merged. The one sub-task left is related to unit-tests that uses the WAL to do end-to-end tests and verify no data loss.

            People

            • Assignee:
              tdas Tathagata Das
              Reporter:
              hshreedharan Hari Shreedharan
            • Votes:
              1 Vote for this issue
              Watchers:
              13 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development