Hadoop HDFS
  1. Hadoop HDFS
  2. HDFS-1595

DFSClient may incorrectly detect datanode failure

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Critical Critical
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: datanode, hdfs-client
    • Labels:
      None

      Description

      Suppose a source datanode S is writing to a destination datanode D in a write pipeline. We have an implicit assumption that if S catches an exception when it is writing to D, then D is faulty and S is fine. As a result, DFSClient will take out D from the pipeline, reconstruct the write pipeline with the remaining datanodes and then continue writing .

      However, we find a case that the faulty machine F is indeed S but not D. In the case we found, F has a faulty network interface (or a faulty switch port) in such a way that the faulty network interface works fine when transferring a small amount of data, say 1MB, but it often fails when transferring a large amount of data, say 100MB.

      It is even worst if F is the first datanode in the pipeline. Consider the following:

      1. DFSClient creates a pipeline with three datanodes. The first datanode is F.
      2. F catches an IOException when writing to the second datanode. Then, F reports the second datanode has error.
      3. DFSClient removes the second datanode from the pipeline and continue writing with the remaining datanode(s).
      4. The pipeline now has two datanodes but (2) and (3) repeat.
      5. Now, only F remains in the pipeline. DFSClient continues writing with one replica in F.
      6. The write succeeds and DFSClient is able to close the file successfully.
      7. The block is under replicated. The NameNode schedules replication from F to some other datanode D.
      8. The replication fails for the same reason. D reports to the NameNode that the replica in F is corrupted.
      9. The NameNode marks the replica in F is corrupted.
      10. The block is corrupted since no replica is available.

      We were able to manually divide the replicas into small files and copy them out from F without fixing the hardware. The replicas seems uncorrupted. This is a data availability problem.

        Issue Links

          Activity

          Hide
          Tsz Wo Nicholas Sze added a comment -

          Here is my thought for fixing the problem:

          When datanode S transfers data to datanode D and the transfer fails, either D or S cannot tell which machine is faulty except for some obvious cases (e.g. a datanode finds itself having problem.) Thus, the current mechanism trying to
          detect the faulty datanode for all cases is not achievable.

          How about we change DFSClient so that if the replication factor is greater than one, it stops writing when there is only
          one datanode remained in the pipeline because that datanode may be faulty?

          Show
          Tsz Wo Nicholas Sze added a comment - Here is my thought for fixing the problem: When datanode S transfers data to datanode D and the transfer fails, either D or S cannot tell which machine is faulty except for some obvious cases (e.g. a datanode finds itself having problem.) Thus, the current mechanism trying to detect the faulty datanode for all cases is not achievable. How about we change DFSClient so that if the replication factor is greater than one, it stops writing when there is only one datanode remained in the pipeline because that datanode may be faulty?
          Hide
          Todd Lipcon added a comment -

          Could we solve this with a blacklist-like feature? ie when a pipeline detects an issue, it sends the pair (S, D) to the NameNode. Then the NN can use this info to exclude DNs which have lots of issues? (and even better, expose to ops so they can check it out for misconfiguration or hardware issues?)

          Show
          Todd Lipcon added a comment - Could we solve this with a blacklist-like feature? ie when a pipeline detects an issue, it sends the pair (S, D) to the NameNode. Then the NN can use this info to exclude DNs which have lots of issues? (and even better, expose to ops so they can check it out for misconfiguration or hardware issues?)
          Hide
          Koji Noguchi added a comment -

          Can we describe the problem users faced before describing how it happened in detail?

          Problem: We got over 200 corrupted blocks due to one unstable datanode. 'write/close' for these files were successful for dfsclients which made it quite bad.

          Show
          Koji Noguchi added a comment - Can we describe the problem users faced before describing how it happened in detail? Problem: We got over 200 corrupted blocks due to one unstable datanode. 'write/close' for these files were successful for dfsclients which made it quite bad.
          Hide
          Todd Lipcon added a comment -

          How about we change DFSClient so that if the replication factor is greater than one, it stops writing when there is only
          one datanode remained in the pipeline because that datanode may be faulty?

          Come to think of it, isn't that what dfs.replication.min does – at least makes close() block until there are two replicas, so in this case you wouldn't have gotten a successful close. Though, I guess this setting is cluster-wide rather than per-stream.

          Show
          Todd Lipcon added a comment - How about we change DFSClient so that if the replication factor is greater than one, it stops writing when there is only one datanode remained in the pipeline because that datanode may be faulty? Come to think of it, isn't that what dfs.replication.min does – at least makes close() block until there are two replicas, so in this case you wouldn't have gotten a successful close. Though, I guess this setting is cluster-wide rather than per-stream.
          Hide
          Tsz Wo Nicholas Sze added a comment -

          > Can we describe the problem users faced before describing how it happened in detail?

          Thanks for adding that.

          Show
          Tsz Wo Nicholas Sze added a comment - > Can we describe the problem users faced before describing how it happened in detail? Thanks for adding that.
          Hide
          Tsz Wo Nicholas Sze added a comment -

          Could we solve this with a blacklist-like feature? ie when a pipeline detects an issue, it sends the pair (S, D) to the NameNode....

          This is a good idea to identify faulty datanodes. However, it does not prevent data loss.

          Come to think of it, isn't that what dfs.replication.min does ...

          There is a subtle difference: if we set dfs.replication.min = 2, you are right that DFSClient.close() will wait till two replicas reported to the NameNode. However, all pipelines, healthy or faulty, have to wait for two replicas. Consequently, the normal cases are penalized and degrade the performance.

          Show
          Tsz Wo Nicholas Sze added a comment - Could we solve this with a blacklist-like feature? ie when a pipeline detects an issue, it sends the pair (S, D) to the NameNode.... This is a good idea to identify faulty datanodes. However, it does not prevent data loss. Come to think of it, isn't that what dfs.replication.min does ... There is a subtle difference: if we set dfs.replication.min = 2, you are right that DFSClient.close() will wait till two replicas reported to the NameNode. However, all pipelines, healthy or faulty, have to wait for two replicas. Consequently, the normal cases are penalized and degrade the performance.
          Hide
          Tsz Wo Nicholas Sze added a comment -

          How about we change DFSClient so that if the replication factor is greater than one, it stops writing when there is only one datanode remained in the pipeline because that datanode may be faulty?

          I mean only when the specified replication factor >= 3. We allow one datanode when the replication factor < 3.

          Show
          Tsz Wo Nicholas Sze added a comment - How about we change DFSClient so that if the replication factor is greater than one, it stops writing when there is only one datanode remained in the pipeline because that datanode may be faulty? I mean only when the specified replication factor >= 3. We allow one datanode when the replication factor < 3.
          Hide
          Todd Lipcon added a comment -

          Consequently, the normal cases are penalized and degrade the performance

          True. I think, though, that failing a write when we drop to replication=1 is also problematic - we have three nodes in the pipeline because we want to tolerate two failures.

          What about something like this: if during completeBlock() call to the NN, there is only one target, but desired replication > 1, then we act as if replication.min == 2 and block the client until it's been replicated? This way we don't have the penalty on good pipelines but we still act cautiously for bad ones.

          Show
          Todd Lipcon added a comment - Consequently, the normal cases are penalized and degrade the performance True. I think, though, that failing a write when we drop to replication=1 is also problematic - we have three nodes in the pipeline because we want to tolerate two failures. What about something like this: if during completeBlock() call to the NN, there is only one target, but desired replication > 1, then we act as if replication.min == 2 and block the client until it's been replicated? This way we don't have the penalty on good pipelines but we still act cautiously for bad ones.
          Hide
          Tsz Wo Nicholas Sze added a comment -

          What about something like this: if during completeBlock() call to the NN, there is only one target, but desired replication > 1, then we act as if replication.min == 2 and block the client until it's been replicated? This way we don't have the penalty on good pipelines but we still act cautiously for bad ones.

          This sounds a good alternative. I need to think about it carefully.

          Show
          Tsz Wo Nicholas Sze added a comment - What about something like this: if during completeBlock() call to the NN, there is only one target, but desired replication > 1, then we act as if replication.min == 2 and block the client until it's been replicated? This way we don't have the penalty on good pipelines but we still act cautiously for bad ones. This sounds a good alternative. I need to think about it carefully.
          Hide
          Tsz Wo Nicholas Sze added a comment -

          What about something like this: if during completeBlock() call to the NN, ...

          There is no completeBlock() call to the NN. We only has complete(..) call to the NN for closing a file.

          ... we have three nodes in the pipeline because we want to tolerate two failures.

          Unfortunately, the currently implementation only can tolerate some but not all two failure cases.

          If we really want to tolerate two failures without risking at data loss, we may

          • set replication factor to 4; or
          • recruit a new datanode when a datanode is removed.
          Show
          Tsz Wo Nicholas Sze added a comment - What about something like this: if during completeBlock() call to the NN, ... There is no completeBlock() call to the NN. We only has complete(..) call to the NN for closing a file. ... we have three nodes in the pipeline because we want to tolerate two failures. Unfortunately, the currently implementation only can tolerate some but not all two failure cases. If we really want to tolerate two failures without risking at data loss, we may set replication factor to 4; or recruit a new datanode when a datanode is removed.
          Hide
          Todd Lipcon added a comment -

          There is no completeBlock() call to the NN. We only has complete(..) call to the NN for closing a file

          Sorry, I meant completeFile() and getAdditionalBlock(). These call BlockManager.commitOrCompleteLastBlock - this can check the pipeline length and use a value of 2 instead of minReplication if pipeline length is 1?

          Show
          Todd Lipcon added a comment - There is no completeBlock() call to the NN. We only has complete(..) call to the NN for closing a file Sorry, I meant completeFile() and getAdditionalBlock(). These call BlockManager.commitOrCompleteLastBlock - this can check the pipeline length and use a value of 2 instead of minReplication if pipeline length is 1?
          Hide
          Tsz Wo Nicholas Sze added a comment -

          > Sorry, I meant completeFile() and getAdditionalBlock(). ...

          Okay, I think you do mean ClientProtocol.complete(..) and ClientProtocol.addBlock(..).

          Do you mean changing ClientProtocol.complete(..) and ClientProtocol.addBlock(..) so that DFSClient can tell the NameNode to wait for max(2, minReplication) replicas when there is only datanode in the pipeline?

          Yes, your idea should work.

          Show
          Tsz Wo Nicholas Sze added a comment - > Sorry, I meant completeFile() and getAdditionalBlock(). ... Okay, I think you do mean ClientProtocol.complete(..) and ClientProtocol.addBlock(..). Do you mean changing ClientProtocol.complete(..) and ClientProtocol.addBlock(..) so that DFSClient can tell the NameNode to wait for max(2, minReplication) replicas when there is only datanode in the pipeline? Yes, your idea should work.
          Hide
          Tsz Wo Nicholas Sze added a comment -

          The third solution would be: recruiting a new datanode in pipeline recovery.

          Show
          Tsz Wo Nicholas Sze added a comment - The third solution would be: recruiting a new datanode in pipeline recovery.
          Hide
          Todd Lipcon added a comment -

          Since I wasn't doing a good job of explaining my idea, I just wrote a quick patch which should illustrate what I meant. Haven't tested this at all, but should explain the thought

          Resurrecting the pipeline with more replicas is a nice idea but I imagine it will be super-complicated, no?

          Show
          Todd Lipcon added a comment - Since I wasn't doing a good job of explaining my idea, I just wrote a quick patch which should illustrate what I meant. Haven't tested this at all, but should explain the thought Resurrecting the pipeline with more replicas is a nice idea but I imagine it will be super-complicated, no?
          Hide
          Tsz Wo Nicholas Sze added a comment -
          +    int pipelineReplication = biuc.getNumExpectedLocations();
          

          Hi Todd, I might be wrong but I think it is not that simple as in hdfs-1595-idea.txt. The value of pipelineReplication above is not equal to the actual number of datanodes in the pipeline. In other words, it won't be updated when datanodes are removed from the pipeline.

          We need to change some protocol in order to implement this idea.

          Show
          Tsz Wo Nicholas Sze added a comment - + int pipelineReplication = biuc.getNumExpectedLocations(); Hi Todd, I might be wrong but I think it is not that simple as in hdfs-1595-idea.txt . The value of pipelineReplication above is not equal to the actual number of datanodes in the pipeline. In other words, it won't be updated when datanodes are removed from the pipeline. We need to change some protocol in order to implement this idea.
          Hide
          Todd Lipcon added a comment -

          I'll be honest: I don't know the new Append code in trunk very well. I thought the client called nn.updatePipeline() whenever a node was removed from the pipeline. That's not the case?

          Show
          Todd Lipcon added a comment - I'll be honest: I don't know the new Append code in trunk very well. I thought the client called nn.updatePipeline() whenever a node was removed from the pipeline. That's not the case?
          Hide
          Tsz Wo Nicholas Sze added a comment -

          > I'll be honest: I don't know the new Append code in trunk very well. I thought the client called nn.updatePipeline() whenever a node was removed from the pipeline. That's not the case?

          You are actually right about the new append codes. I am sorry that I was mostly looking at the 0.20 codes. It will work for 0.22 but we need to change protocol for 0.20. Nice.

          Show
          Tsz Wo Nicholas Sze added a comment - > I'll be honest: I don't know the new Append code in trunk very well. I thought the client called nn.updatePipeline() whenever a node was removed from the pipeline. That's not the case? You are actually right about the new append codes. I am sorry that I was mostly looking at the 0.20 codes. It will work for 0.22 but we need to change protocol for 0.20. Nice.
          Hide
          Hairong Kuang added a comment -

          > In the case we found, F has a faulty network interface (or a faulty switch port) in such a way that the faulty network interface works fine when sending out a small amount of data, say 1MB, but it fails when sending out a large amount of data, say 100MB.

          So this faulty node F has no problem receiving large amount of data? That seems an extreme case. Normally the client should get an error sending data to this faulty Node F and thus removes F from the write pipeline.

          Show
          Hairong Kuang added a comment - > In the case we found, F has a faulty network interface (or a faulty switch port) in such a way that the faulty network interface works fine when sending out a small amount of data, say 1MB, but it fails when sending out a large amount of data, say 100MB. So this faulty node F has no problem receiving large amount of data? That seems an extreme case. Normally the client should get an error sending data to this faulty Node F and thus removes F from the write pipeline.
          Hide
          Tsz Wo Nicholas Sze added a comment -

          Yes, reading is working fine for any data size. (updated also the description.)

          Show
          Tsz Wo Nicholas Sze added a comment - Yes, reading is working fine for any data size. (updated also the description.)
          Hide
          Kan Zhang added a comment -

          Looks like we have 3 options when the pipeline is reduced to a single datanode F.

          1. stop writing and fail fast.
          1. recruit a new set of datanodes to be added to F, with F still being the first datanode in the pipeline.
          1. finish writing the block to F and ask NN to wait for 2 replicas before closing the block.

          1) has the drawback of failing even when F is healthy, which is undesirable. Both 2) and 3) will fail when F is bad. And both 2) and 3) will likely succeed when F is healthy. Of the two, I'd prefer 2) over 3), since how soon a replica can be replicated from one datanode to another depends on many factors. It is less predictable than 2), where the client actively setting up a new pipeline and resume writing. One long-tail task spending much longer time to finish than others impacts the total running time of the whole job.

          Show
          Kan Zhang added a comment - Looks like we have 3 options when the pipeline is reduced to a single datanode F. stop writing and fail fast. recruit a new set of datanodes to be added to F, with F still being the first datanode in the pipeline. finish writing the block to F and ask NN to wait for 2 replicas before closing the block. 1) has the drawback of failing even when F is healthy, which is undesirable. Both 2) and 3) will fail when F is bad. And both 2) and 3) will likely succeed when F is healthy. Of the two, I'd prefer 2) over 3), since how soon a replica can be replicated from one datanode to another depends on many factors. It is less predictable than 2), where the client actively setting up a new pipeline and resume writing. One long-tail task spending much longer time to finish than others impacts the total running time of the whole job.
          Hide
          Todd Lipcon added a comment -

          Another option not mentioned above that we use in HBase is to periodically poll the pipeline from the "application code" to find out how many replicas are in it (there's an API to do that in trunk). If we see the pipeline drop below 3 replicas, we roll the output to a new file (hence new pipeline). For something like a commit log where we really just care about a stream of records, the actual file boundaries have no semantic meaning, so rolling is "free" an we get a full pipeline.

          For MR it's a bit trickier to do that in general, but might be useful for certain output formats if we can figure out how to make the APIs non-gross. eg if a reducer is writing part-00000 and gets a pipeline failure it could continue writing to part-00000.1 or something.

          Show
          Todd Lipcon added a comment - Another option not mentioned above that we use in HBase is to periodically poll the pipeline from the "application code" to find out how many replicas are in it (there's an API to do that in trunk). If we see the pipeline drop below 3 replicas, we roll the output to a new file (hence new pipeline). For something like a commit log where we really just care about a stream of records, the actual file boundaries have no semantic meaning, so rolling is "free" an we get a full pipeline. For MR it's a bit trickier to do that in general, but might be useful for certain output formats if we can figure out how to make the APIs non-gross. eg if a reducer is writing part-00000 and gets a pipeline failure it could continue writing to part-00000.1 or something.
          Hide
          Tsz Wo Nicholas Sze added a comment -

          > Resurrecting the pipeline with more replicas is a nice idea but I imagine it will be super-complicated, no?

          Yes, it is complicated. Nonetheless, it is invaluable for this JIRA, append and other applications like the HBase use case you provided.

          Show
          Tsz Wo Nicholas Sze added a comment - > Resurrecting the pipeline with more replicas is a nice idea but I imagine it will be super-complicated, no? Yes, it is complicated. Nonetheless, it is invaluable for this JIRA, append and other applications like the HBase use case you provided.
          Hide
          Koji Noguchi added a comment -

          So this faulty node F has no problem receiving large amount of data?

          This faulty node had problem sending/receiving large amount of data and failing most of the time.
          Bigger the data, higher the chances of the failures. I think smaller data (,say less than 1MB) was going through 99% of the time.
          So heartbeat, ack and so forth were probably working.

          When I tried to scp some blocks out from this node for data recovery, it kept on failing with

          ===========================
          blk_113193561174013799 0% 0 0.0KB/s --:- ETA
          Corrupted MAC on input.
          Finished discarding for aa.bb.cc.dd
          lost connection

          ===========================

          So I believe most of the dfsclient write was failing when going through this node.
          And when it successfully went through (after hundreds of write attempts for different blocks), it would then fail on all the following replications but succeed on 'close' with 1 replica leading to this bug.

          Show
          Koji Noguchi added a comment - So this faulty node F has no problem receiving large amount of data? This faulty node had problem sending/receiving large amount of data and failing most of the time. Bigger the data, higher the chances of the failures. I think smaller data (,say less than 1MB) was going through 99% of the time. So heartbeat, ack and so forth were probably working. When I tried to scp some blocks out from this node for data recovery, it kept on failing with =========================== blk_ 113193561174013799 0% 0 0.0KB/s --: - ETA Corrupted MAC on input. Finished discarding for aa.bb.cc.dd lost connection =========================== So I believe most of the dfsclient write was failing when going through this node. And when it successfully went through (after hundreds of write attempts for different blocks), it would then fail on all the following replications but succeed on 'close' with 1 replica leading to this bug.
          Hide
          dhruba borthakur added a comment -

          It appears that Todd's proposal could work well to avoid this issue, do you agree Nicholas?

          It appears to me (please correct me if I am wrong) to be a data availability problem. The replicas F is actually still intact and the data is good there, it is just that clients are unable to read that data. is it true that if the network card on F gets fixed, the data becomes available once again?

          Show
          dhruba borthakur added a comment - It appears that Todd's proposal could work well to avoid this issue, do you agree Nicholas? It appears to me (please correct me if I am wrong) to be a data availability problem. The replicas F is actually still intact and the data is good there, it is just that clients are unable to read that data. is it true that if the network card on F gets fixed, the data becomes available once again?
          Hide
          Tsz Wo Nicholas Sze added a comment -

          It appears that Todd's proposal could work well to avoid this issue, do you agree Nicholas?

          Agree, especially for the new append.

          It appears to me (please correct me if I am wrong) to be a data availability problem. The replicas F is actually still intact and the data is good there, it is just that clients are unable to read that data. is it true that if the network card on F gets fixed, the data becomes available once again?

          In the case we encountered, Koji was able to manually divide the blocks into small files and copy them out from F without fixing the hardware. It does seem that the data were good.

          Show
          Tsz Wo Nicholas Sze added a comment - It appears that Todd's proposal could work well to avoid this issue, do you agree Nicholas? Agree, especially for the new append. It appears to me (please correct me if I am wrong) to be a data availability problem. The replicas F is actually still intact and the data is good there, it is just that clients are unable to read that data. is it true that if the network card on F gets fixed, the data becomes available once again? In the case we encountered, Koji was able to manually divide the blocks into small files and copy them out from F without fixing the hardware. It does seem that the data were good.
          Hide
          Tsz Wo Nicholas Sze added a comment -

          Revised the description. Thanks Koji and Dhruba for correcting me.

          Show
          Tsz Wo Nicholas Sze added a comment - Revised the description. Thanks Koji and Dhruba for correcting me.
          Hide
          Tsz Wo Nicholas Sze added a comment -

          Todd, would you like to work on this with your idea?

          Show
          Tsz Wo Nicholas Sze added a comment - Todd, would you like to work on this with your idea?
          Hide
          Todd Lipcon added a comment -

          Yea, I can take this, but it may be a bit before I can get to it - mostly focusing on bug fixing at the moment (I would classify this as a "missing feature" more than a bug).

          Show
          Todd Lipcon added a comment - Yea, I can take this, but it may be a bit before I can get to it - mostly focusing on bug fixing at the moment (I would classify this as a "missing feature" more than a bug).
          Hide
          Tsz Wo Nicholas Sze added a comment -

          Thanks Todd, but if you are busy, we may let someone else pick up this.

          Show
          Tsz Wo Nicholas Sze added a comment - Thanks Todd, but if you are busy, we may let someone else pick up this.
          Hide
          Tsz Wo Nicholas Sze added a comment -

          Created HDFS-1606 for adding new datanodes to the write pipeline.

          Show
          Tsz Wo Nicholas Sze added a comment - Created HDFS-1606 for adding new datanodes to the write pipeline.
          Hide
          dhruba borthakur added a comment -

          Error recovery is a pain when a datanode in a write pipeline fails. Sometimes it is truly difficult for the client to accurately determine which datanode failed. Does it make sense to change the algorithm itself: what are the tradeoff's if we say that when the number of datanode in the write-pipeline decreases to min.replication, the client streams data directly to all remaining (or new) datanodes, instead of pipelining? If new datanodes fail, the client will find it easy to determine accurately which datanodes are dead.

          Show
          dhruba borthakur added a comment - Error recovery is a pain when a datanode in a write pipeline fails. Sometimes it is truly difficult for the client to accurately determine which datanode failed. Does it make sense to change the algorithm itself: what are the tradeoff's if we say that when the number of datanode in the write-pipeline decreases to min.replication, the client streams data directly to all remaining (or new) datanodes, instead of pipelining? If new datanodes fail, the client will find it easy to determine accurately which datanodes are dead.

            People

            • Assignee:
              Unassigned
              Reporter:
              Tsz Wo Nicholas Sze
            • Votes:
              1 Vote for this issue
              Watchers:
              12 Start watching this issue

              Dates

              • Created:
                Updated:

                Development