Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.3.0
    • Fix Version/s: 0.3.0
    • Component/s: Data Collection
    • Labels:
      None

      Description

      We like to say that Chukwa is a system for reliable log collection. It isn't, quite, since we don't handle collector crashes. Here's a proposed reliability mechanism.

      1. delayedAcks.patch
        51 kB
        Ari Rabkin
      2. CHUKWA-369.patch
        79 kB
        Ari Rabkin

        Issue Links

          Activity

          Hide
          Ari Rabkin added a comment -

          Right now, collectors just blindly send back OK after every chunk, even if the data isn't stable on disk. The OK is sent after the data is handed to a Writer, and therefore after Writer.add() returns. But Writer.add() is void, and so we get no verification that the write committed.

          I'd like to have Writer.add() return one of two things:
          either an OK, or else a "Witness string", which get passed back to the client. "OK" means that the data is now the collector's responsibility, and the agent should advance its checkpointed state.

          The witness string is a filename in HDFS and file length. Periodically, the agent checks the length of the file; if it exceeds the specified length, then the data has been committed to the file, and the agent can again advance its checkpoint. If the data hasn't committed within the specified period, than the agent stops all running adaptors, and resumes from the last checkpoint.

          This is much easier to implement if we can assume a few things:
          1) A single collector will commit data from a single Agent in order.
          2) A single agent won't fail-over to a new collector unless the previous collector failed: therefore, even if writes are split across collectors, we're still guaranteed commit-in-order.
          3) Collector failures are rare, and therefore agents don't need to update their checkpoints all that often, and can safely rewind several minutes in the event of failure.

          All these assumptions are currently true; I just want to document them and explain clearly that they can't be violated without breaking the reliability mechanism.

          Show
          Ari Rabkin added a comment - Right now, collectors just blindly send back OK after every chunk, even if the data isn't stable on disk. The OK is sent after the data is handed to a Writer, and therefore after Writer.add() returns. But Writer.add() is void, and so we get no verification that the write committed. I'd like to have Writer.add() return one of two things: either an OK, or else a "Witness string", which get passed back to the client. "OK" means that the data is now the collector's responsibility, and the agent should advance its checkpointed state. The witness string is a filename in HDFS and file length. Periodically, the agent checks the length of the file; if it exceeds the specified length, then the data has been committed to the file, and the agent can again advance its checkpoint. If the data hasn't committed within the specified period, than the agent stops all running adaptors, and resumes from the last checkpoint. This is much easier to implement if we can assume a few things: 1) A single collector will commit data from a single Agent in order. 2) A single agent won't fail-over to a new collector unless the previous collector failed: therefore, even if writes are split across collectors, we're still guaranteed commit-in-order. 3) Collector failures are rare, and therefore agents don't need to update their checkpoints all that often, and can safely rewind several minutes in the event of failure. All these assumptions are currently true; I just want to document them and explain clearly that they can't be violated without breaking the reliability mechanism.
          Hide
          Ari Rabkin added a comment -

          One additional refinement: There are two broad classes of adaptors; those that can reliably recover data, and those that can't. File tailers can, exec adaptors, socket listeners, etc, can't.

          Proposal is that adaptors that can't resume after a crash should explicitly update the checkpoint state when they send data. This way, after a crash, we'll get explicit gaps in the stream for those adaptors, and it'll be obvious to downstream listeners what happened, and there won't be ambiguity about where the data went.

          This last change should maybe be its own JIRA – it's a pretty compact fix. I think only a single line.

          Show
          Ari Rabkin added a comment - One additional refinement: There are two broad classes of adaptors; those that can reliably recover data, and those that can't. File tailers can, exec adaptors, socket listeners, etc, can't. Proposal is that adaptors that can't resume after a crash should explicitly update the checkpoint state when they send data. This way, after a crash, we'll get explicit gaps in the stream for those adaptors, and it'll be obvious to downstream listeners what happened, and there won't be ambiguity about where the data went. This last change should maybe be its own JIRA – it's a pretty compact fix. I think only a single line.
          Hide
          Eric Yang added a comment -

          Writer.add() throws exception if there is a problem. I think if add method is called without exception, it means data is committed properly. If the data is not on disk, the bug should be fixed inside the add method rather than changing the return code because return code does not improve validation. If it fails, throws WriterException is more proper.

          Show
          Eric Yang added a comment - Writer.add() throws exception if there is a problem. I think if add method is called without exception, it means data is committed properly. If the data is not on disk, the bug should be fixed inside the add method rather than changing the return code because return code does not improve validation. If it fails, throws WriterException is more proper.
          Hide
          Ari Rabkin added a comment -

          Eric –

          The difficulty is that with HDFS, there's no easy way to know if data has committed: it can happen that a call to write() succeeds, but the collector then crashes before the data left the process. Flush in 0.21 may help, but I'm not sure we want to flush on every write. The purpose of the proposed mechanism is to decouple commit-detection from the collector, and let the Agent determine on its own if data committed.

          Show
          Ari Rabkin added a comment - Eric – The difficulty is that with HDFS, there's no easy way to know if data has committed: it can happen that a call to write() succeeds, but the collector then crashes before the data left the process. Flush in 0.21 may help, but I'm not sure we want to flush on every write. The purpose of the proposed mechanism is to decouple commit-detection from the collector, and let the Agent determine on its own if data committed.
          Hide
          Eric Yang added a comment -

          There was another attempt to address this issue, and I think we could learn something from our past attempt. The main thing that we learn was that flush on every write while holding the client connection yield poor performance collector.

          LocalWriter was designed to handle this problem. Instead of writing to HDFS directly, it writes to local file system then put the file onto HDFS. The main idea was to disconnect the synchronization between agent, collector, and data node while improving data reliability. When collector crashed, it will resume to process from it's local disk.

          The LocalWriter was not finished but it shows some promising idea in addressing the reliability problem. The major flaw was that writing to local disk was faster than writing to HDFS, and the result was frequent disk full in the collector. The actual implementation could be improved by limiting the local disk usage and stop receiving additional chunks if the disk queue is reaches quota. This should improve collector reliability without using the synchronized pipeline.

          Show
          Eric Yang added a comment - There was another attempt to address this issue, and I think we could learn something from our past attempt. The main thing that we learn was that flush on every write while holding the client connection yield poor performance collector. LocalWriter was designed to handle this problem. Instead of writing to HDFS directly, it writes to local file system then put the file onto HDFS. The main idea was to disconnect the synchronization between agent, collector, and data node while improving data reliability. When collector crashed, it will resume to process from it's local disk. The LocalWriter was not finished but it shows some promising idea in addressing the reliability problem. The major flaw was that writing to local disk was faster than writing to HDFS, and the result was frequent disk full in the collector. The actual implementation could be improved by limiting the local disk usage and stop receiving additional chunks if the disk queue is reaches quota. This should improve collector reliability without using the synchronized pipeline.
          Hide
          Ari Rabkin added a comment -

          I had thought LocalWriter was finished and working. What are the known limitations?

          As I see it, there are two problems with LocalWriter. First, that you can run out of local disk. Second, if the collector crashes, data will be lost unless and until the collector comes back up. If the node dies, data is lost. The approach I'm suggesting is entirely flushless, and avoids both problems. Also, it can coexist gracefully with the existing localwriter, or with with base SeqFileWriter.

          Show
          Ari Rabkin added a comment - I had thought LocalWriter was finished and working. What are the known limitations? As I see it, there are two problems with LocalWriter. First, that you can run out of local disk. Second, if the collector crashes, data will be lost unless and until the collector comes back up. If the node dies, data is lost. The approach I'm suggesting is entirely flushless, and avoids both problems. Also, it can coexist gracefully with the existing localwriter, or with with base SeqFileWriter.
          Hide
          Jerome Boulon added a comment -

          My 2 cts, The LocalWriter is checking for disk space and will bail out if the disk space reach a quota:

          if (freeSpace < minFreeAvailable)

          { log.fatal("No space left on device, Bail out!"); DaemonWatcher.bailout(-1); }

          So the localWriter should do the job for now.
          Regarding HDFS, there's work in progress to be able to use HDFS to write redo logs so when this will be available, we will take advantage of that.
          For people that want to still use the HDFS writer, they can change some HDFS params to reduce the time before the hdfs client detect a problem.
          Facebook is also using 2 overlapping HDFS cluster to get HA from scribe point of view. The missing part from our side is a secondary writer but could easily be implemented.

          Show
          Jerome Boulon added a comment - My 2 cts, The LocalWriter is checking for disk space and will bail out if the disk space reach a quota: if (freeSpace < minFreeAvailable) { log.fatal("No space left on device, Bail out!"); DaemonWatcher.bailout(-1); } So the localWriter should do the job for now. Regarding HDFS, there's work in progress to be able to use HDFS to write redo logs so when this will be available, we will take advantage of that. For people that want to still use the HDFS writer, they can change some HDFS params to reduce the time before the hdfs client detect a problem. Facebook is also using 2 overlapping HDFS cluster to get HA from scribe point of view. The missing part from our side is a secondary writer but could easily be implemented.
          Hide
          Eric Yang added a comment -

          The list command check may be a scalability problem for the namenode with many collectors/agents. When there are thousands of Agents issuing size check, this will periodically spike the namenode. I think you will want to avoid this.

          Jerome is more familiar with localWriter, and it seems quota feature is already committed. Perhaps you could try localWriter instead of SeqFileWriter?

          Show
          Eric Yang added a comment - The list command check may be a scalability problem for the namenode with many collectors/agents. When there are thousands of Agents issuing size check, this will periodically spike the namenode. I think you will want to avoid this. Jerome is more familiar with localWriter, and it seems quota feature is already committed. Perhaps you could try localWriter instead of SeqFileWriter?
          Hide
          Ari Rabkin added a comment -

          @Jerome: I don't see how bailing out to avoid disk-full solves the problem of collectors crashing. The failure scenario I'm worried about is that LocalWriter writes the data, and then the collector dies in a non-recoverable way. The data on disk is now useless, and the Right Thing is for the agent to retransmit to a different collector.

          Certainly, HDFS improvements would help reduce this problem. But I think I can implement my proposal here in a week or so – and that gets us reliability even with previous versions of the filesystem.

          @Eric: The point about overloading the name node is a fair one. Let me propose the following modification:

          • Instead of querying HDFS directly, agents should do a GET request to a collector. The collector has to do only a single list every few minutes, and cache the results, to satisfy all the agents. This radically cuts down on traffic to namenode. Plus isolates the Chukwa DFS from agents.
          Show
          Ari Rabkin added a comment - @Jerome: I don't see how bailing out to avoid disk-full solves the problem of collectors crashing. The failure scenario I'm worried about is that LocalWriter writes the data, and then the collector dies in a non-recoverable way. The data on disk is now useless, and the Right Thing is for the agent to retransmit to a different collector. Certainly, HDFS improvements would help reduce this problem. But I think I can implement my proposal here in a week or so – and that gets us reliability even with previous versions of the filesystem. @Eric: The point about overloading the name node is a fair one. Let me propose the following modification: Instead of querying HDFS directly, agents should do a GET request to a collector. The collector has to do only a single list every few minutes, and cache the results, to satisfy all the agents. This radically cuts down on traffic to namenode. Plus isolates the Chukwa DFS from agents.
          Hide
          Jerome Boulon added a comment -

          Bailing out will not solve the problem of collector crashing but solve the issue that Eric mention : potential disk full

          @Ari : How do you know that your data is actually safe, aka committed to HDFS? Can you give us more details?

          Show
          Jerome Boulon added a comment - Bailing out will not solve the problem of collector crashing but solve the issue that Eric mention : potential disk full @Ari : How do you know that your data is actually safe, aka committed to HDFS? Can you give us more details?
          Hide
          Ari Rabkin added a comment -

          @Jerome:

          The proposal is as follows:
          1) In response to a PUT,the collector returns the filename and position in the sink file where the data will be written, if it gets written. Since files have exactly one writer, we're guaranteed that no other writer can write to that offset. And if the write succeeds, it'll be the write corresponding to that PUT.
          2) Some minutes later, the agent asks a collector, any collector, how long the indicated sink file (or corresponding .done file) is. If it's greater than the indicated length, the write succeeded.

          There's one small wrinkle.
          2a) If a .done was created, and then removed by demux or archiving, collectors should continue to show it as having been written. There's a couple ways to do this. For instance, collectors could also look in the archive input and output dirs, to see if the .done file is there. And could remember the .dones they saw previously, on the assumption that if it ever existed, it's somewhere in the processing pipeline and the data is safe.

          Furthermore, if we go this route, we really ought to do something about "marooned" .chukwa files. Right now, if a collector crashes or is stopped, it leaves a .chukwa file in the sink. And these files never get processed and never get deleted. Some other collector ought to rename it and make it available for processing. This is probably a good thing in general, but not actually required for the reliability mechanism I'm proposing.

          Show
          Ari Rabkin added a comment - @Jerome: The proposal is as follows: 1) In response to a PUT,the collector returns the filename and position in the sink file where the data will be written, if it gets written. Since files have exactly one writer, we're guaranteed that no other writer can write to that offset. And if the write succeeds, it'll be the write corresponding to that PUT. 2) Some minutes later, the agent asks a collector, any collector, how long the indicated sink file (or corresponding .done file) is. If it's greater than the indicated length, the write succeeded. There's one small wrinkle. 2a) If a .done was created, and then removed by demux or archiving, collectors should continue to show it as having been written. There's a couple ways to do this. For instance, collectors could also look in the archive input and output dirs, to see if the .done file is there. And could remember the .dones they saw previously, on the assumption that if it ever existed, it's somewhere in the processing pipeline and the data is safe. Furthermore, if we go this route, we really ought to do something about "marooned" .chukwa files. Right now, if a collector crashes or is stopped, it leaves a .chukwa file in the sink. And these files never get processed and never get deleted. Some other collector ought to rename it and make it available for processing. This is probably a good thing in general, but not actually required for the reliability mechanism I'm proposing.
          Hide
          Jerome Boulon added a comment -

          Regarding the issue with .chukwa files, the new LocalWriter is taking care of this. Any file older than the rotation period +1min will be rename and send over to HDFS.

          @Ari: there's one thing I don't understand. Since there's more than one client writing to the same SeqFile, How do you know that the 2 additional MBs that you are seeing on the file is comming from Client1 and not Client2? Also keep in mind that in order to improve performance, most of the time you will have to buffer data in memory first then write in big chunk to disk.
          This is what HDFS is doing and from what I know there's no easy way to figure out if the data is still in memory or has been written to disk (At least for now).

          So unless you are able to keep track of the last SeqID per RecordType/Agent at the collector side and then figure out what has been push to disk and what is still in memory, I don't see a way to send the right information back to the Agent.

          Show
          Jerome Boulon added a comment - Regarding the issue with .chukwa files, the new LocalWriter is taking care of this. Any file older than the rotation period +1min will be rename and send over to HDFS. @Ari: there's one thing I don't understand. Since there's more than one client writing to the same SeqFile, How do you know that the 2 additional MBs that you are seeing on the file is comming from Client1 and not Client2? Also keep in mind that in order to improve performance, most of the time you will have to buffer data in memory first then write in big chunk to disk. This is what HDFS is doing and from what I know there's no easy way to figure out if the data is still in memory or has been written to disk (At least for now). So unless you are able to keep track of the last SeqID per RecordType/Agent at the collector side and then figure out what has been push to disk and what is still in memory, I don't see a way to send the right information back to the Agent.
          Hide
          Eric Yang added a comment -

          HTTP return code should be the only contract between agent and collector. If http return code returns 200, then the data should be managed by the collector at that point. The async status check from agent to the collector will only complicate things because collector could be busy and unable to answer the status check request. It creates domino effects for the agent to resend chunks because the second status check may fail more than once on the busy collector.

          As a summary of the states, there are 3 ways to solve the problem.

          1. Having synchronized pipeline. (Agent write 1 minute worth of data, wait for collector to close the file after 1 minute mark then return HTTP code). If collector does not close the file properly, no HTTP code is return, and agent resend the 1 minute worth of data (or since last check point). This depends on HDFS IO performance, previous experience with 0.18 and 0.20 yield around 20MB/s.

          2. Having asynchronous pipeline, difficult to track progress of each agent with the collectors. A lot of memory overhead to keep track of agent status inside collector. Status check request may not respond, and cause retransmission frequently.

          3. Use localWriter to write data on collector node first, and data is uploaded to HDFS asynchronously. Down side of this is, collector disk is stressed, the wear and tear of collector disk could result of bad data being injected to HDFS without crc check. Collector disk crash = data lost.

          There is really no perfect solution here, but option 1 is less error prone. As long as Hadoop improve performance, Chukwa benefits too.

          Show
          Eric Yang added a comment - HTTP return code should be the only contract between agent and collector. If http return code returns 200, then the data should be managed by the collector at that point. The async status check from agent to the collector will only complicate things because collector could be busy and unable to answer the status check request. It creates domino effects for the agent to resend chunks because the second status check may fail more than once on the busy collector. As a summary of the states, there are 3 ways to solve the problem. 1. Having synchronized pipeline. (Agent write 1 minute worth of data, wait for collector to close the file after 1 minute mark then return HTTP code). If collector does not close the file properly, no HTTP code is return, and agent resend the 1 minute worth of data (or since last check point). This depends on HDFS IO performance, previous experience with 0.18 and 0.20 yield around 20MB/s. 2. Having asynchronous pipeline, difficult to track progress of each agent with the collectors. A lot of memory overhead to keep track of agent status inside collector. Status check request may not respond, and cause retransmission frequently. 3. Use localWriter to write data on collector node first, and data is uploaded to HDFS asynchronously. Down side of this is, collector disk is stressed, the wear and tear of collector disk could result of bad data being injected to HDFS without crc check. Collector disk crash = data lost. There is really no perfect solution here, but option 1 is less error prone. As long as Hadoop improve performance, Chukwa benefits too.
          Hide
          Ari Rabkin added a comment -

          @Jerome.

          I don't believe we can ever have multiple clients writing the same sequence file. Exactly one writer per HDFS file, and writer.add() is always called on behalf of exactly one client. So add() can return, unambiguously, the offset at which that client's data will be written.

          Yes. Writes are asynchronous. But looking at the length of the file tells you if the write committed or not. The whole point of what I'm proposing is to allow the check for whether the write succeeded to be asynchronous, too. There is no need to keep track of what's in memory. All there is is "data committed to the file, and therefore visible as written", and "data not yet committed, that we'll re-write if a timeout has expired".

          @Eric:
          I don't think a status check once every five minutes, per agent, would be a problem. That's a small fraction of the number of POSTs that we do – and we retransmit very aggressively when the collector returns a 500 error, there. So this shouldn't make things worse than they already are.

          The interface you're describing – where the collector takes responsibility of the data as soon as it returns an HTTP response – is incompatible with your option (1), because we're not willing to wait a whole minute before returning an HTTP response – we'd run out of request handling threads in the server.

          I don't know what measurements you're citing. We never implemented option 1. What we've actually implemented is an unreliable version of (1), where the collector takes responsibility – and leaves data in RAM when it responds. The collector, using the code we've written, simply does not flush data before responding. So those benchmarks don't really apply. Likewise, the LocalFSWriter doesn't commit data before responding. The code we actually have returns "OK" before saving the data to disk.

          @both:

          I see there's substantial disagreement here. So I think it makes sense for me to implement and test at scale before submitting a patch for review. If I submit measurements, at scale, demonstrating what I have in mind would that be likely to sway you?

          Show
          Ari Rabkin added a comment - @Jerome. I don't believe we can ever have multiple clients writing the same sequence file. Exactly one writer per HDFS file, and writer.add() is always called on behalf of exactly one client. So add() can return, unambiguously, the offset at which that client's data will be written. Yes. Writes are asynchronous. But looking at the length of the file tells you if the write committed or not. The whole point of what I'm proposing is to allow the check for whether the write succeeded to be asynchronous, too. There is no need to keep track of what's in memory. All there is is "data committed to the file, and therefore visible as written", and "data not yet committed, that we'll re-write if a timeout has expired". @Eric: I don't think a status check once every five minutes, per agent, would be a problem. That's a small fraction of the number of POSTs that we do – and we retransmit very aggressively when the collector returns a 500 error, there. So this shouldn't make things worse than they already are. The interface you're describing – where the collector takes responsibility of the data as soon as it returns an HTTP response – is incompatible with your option (1), because we're not willing to wait a whole minute before returning an HTTP response – we'd run out of request handling threads in the server. I don't know what measurements you're citing. We never implemented option 1. What we've actually implemented is an unreliable version of (1), where the collector takes responsibility – and leaves data in RAM when it responds. The collector, using the code we've written, simply does not flush data before responding. So those benchmarks don't really apply. Likewise, the LocalFSWriter doesn't commit data before responding. The code we actually have returns "OK" before saving the data to disk. @both: I see there's substantial disagreement here. So I think it makes sense for me to implement and test at scale before submitting a patch for review. If I submit measurements, at scale, demonstrating what I have in mind would that be likely to sway you?
          Hide
          Ari Rabkin added a comment -

          Okay. I have this coded up and working locally. Users are able to choose between this new mechanism, and the existing ones. I will test at Berkeley and report my performance results. I needed to do some fairly major refactoring of the Sender and Connector code, but I think it's a win in general.

          I am not yet proposing to commit this and the patch, as yet, is not intended for inclusion. Rather, I'm posting it to get comments on the overall structure and to clarify what I intended.

          Show
          Ari Rabkin added a comment - Okay. I have this coded up and working locally. Users are able to choose between this new mechanism, and the existing ones. I will test at Berkeley and report my performance results. I needed to do some fairly major refactoring of the Sender and Connector code, but I think it's a win in general. I am not yet proposing to commit this and the patch, as yet, is not intended for inclusion. Rather, I'm posting it to get comments on the overall structure and to clarify what I intended.
          Hide
          Ari Rabkin added a comment -

          Patch has basically five pieces; I'm happy to split them up and commit separately if some are uncontroversial.

          1) Sender and Connector are refactored to allow the HttpClient to be reused, and used more generally.
          2) Writers now return an instance of ChukwaWriter.CommitStatus. This is either OK, Failure, or Pending. The first two are singletons, the latter includes a list of strings.
          3) SeqFileWriter returns a CommitPending on writes.
          4) A new servlet, CommitCheckServlet for periodically scanning HDFS.
          5) A new Sender, the AsyncAckSender, that doesn't automatically commit, but only does so when it either receives an OK, or else after a pending commit becomes stable. The Sender periodically asks a CommitCheckServlet what's been committed.

          I think (1), and possibly (2+3) may make sense even without 4 and 5, which are the bits that I think need serious testing before we should even discuss committing them.

          Show
          Ari Rabkin added a comment - Patch has basically five pieces; I'm happy to split them up and commit separately if some are uncontroversial. 1) Sender and Connector are refactored to allow the HttpClient to be reused, and used more generally. 2) Writers now return an instance of ChukwaWriter.CommitStatus. This is either OK, Failure, or Pending. The first two are singletons, the latter includes a list of strings. 3) SeqFileWriter returns a CommitPending on writes. 4) A new servlet, CommitCheckServlet for periodically scanning HDFS. 5) A new Sender, the AsyncAckSender, that doesn't automatically commit, but only does so when it either receives an OK, or else after a pending commit becomes stable. The Sender periodically asks a CommitCheckServlet what's been committed. I think (1), and possibly (2+3) may make sense even without 4 and 5, which are the bits that I think need serious testing before we should even discuss committing them.
          Hide
          Eric Yang added a comment -

          I am still uncomfortable with this change because the proposed patch is suggesting a stateful collector. This means it will not scale well across many collectors. AsyncAckSender doesn't seem to be able to handle flip flapping in network connection well.

          Show
          Eric Yang added a comment - I am still uncomfortable with this change because the proposed patch is suggesting a stateful collector. This means it will not scale well across many collectors. AsyncAckSender doesn't seem to be able to handle flip flapping in network connection well.
          Hide
          Ari Rabkin added a comment -

          Eric:

          The only state at collectors is soft, and regenerated periodically (depending on the scan frequency, but think every few minutes.) It's totally okay if all the collectors crash and then restart; acks will be delayed for a while, but they'll eventually arrive.

          And all collectors share the same state, since they're ls-ing the same filesystem. So I don't see why flapping would be a problem.

          Show
          Ari Rabkin added a comment - Eric: The only state at collectors is soft, and regenerated periodically (depending on the scan frequency, but think every few minutes.) It's totally okay if all the collectors crash and then restart; acks will be delayed for a while, but they'll eventually arrive. And all collectors share the same state, since they're ls-ing the same filesystem. So I don't see why flapping would be a problem.
          Hide
          Eric Yang added a comment -

          Let's say if the scan frequency is set at every minute because we are set to close file per minute and demux at the same frequency, and we have 100 collectors, (typical yahoo chukwa cluster) this means the name node needs to handled 1.4 ls command per second. In my opinion, this design is too costly. The robustness of HDFS writer should be handled in the hadoop hdfs client or collector. Name node should not bare this cost.

          Show
          Eric Yang added a comment - Let's say if the scan frequency is set at every minute because we are set to close file per minute and demux at the same frequency, and we have 100 collectors, (typical yahoo chukwa cluster) this means the name node needs to handled 1.4 ls command per second. In my opinion, this design is too costly. The robustness of HDFS writer should be handled in the hadoop hdfs client or collector. Name node should not bare this cost.
          Hide
          Ari Rabkin added a comment -

          Alright. It wouldn't be too hard to split "collectors" from "nodes that check for file completion" – could have only a handful of the latter, drastically cutting down the load on the filesystem. Would that address your concern?

          But I'm curious. Did you really need 100 collectors? My understanding is that that was a very substantial overprovisioning. Each one can do 20 MB/sec. Do you really have 2 GB/sec of monitoring data?

          Show
          Ari Rabkin added a comment - Alright. It wouldn't be too hard to split "collectors" from "nodes that check for file completion" – could have only a handful of the latter, drastically cutting down the load on the filesystem. Would that address your concern? But I'm curious. Did you really need 100 collectors? My understanding is that that was a very substantial overprovisioning. Each one can do 20 MB/sec. Do you really have 2 GB/sec of monitoring data?
          Hide
          Eric Yang added a comment -

          What is the algorithm used to identify the written bytes from chunk 2 from server a as oppose to chunk 2 from server b if the data are stacked together? There doesn't seem to be an easy way to identify if one of the chunk is missing.

          Our deployment has collector mapped to exact number of datanode deployed. 100kb/sec from 25000 machines easily met the 2GB/sec data, Chukwa is designed to process data in parallel with stateless transport. We need to ensure that this solution does not impact parallelism.

          Show
          Eric Yang added a comment - What is the algorithm used to identify the written bytes from chunk 2 from server a as oppose to chunk 2 from server b if the data are stacked together? There doesn't seem to be an easy way to identify if one of the chunk is missing. Our deployment has collector mapped to exact number of datanode deployed. 100kb/sec from 25000 machines easily met the 2GB/sec data, Chukwa is designed to process data in parallel with stateless transport. We need to ensure that this solution does not impact parallelism.
          Hide
          Ari Rabkin added a comment -

          When you say "server a" and "sever b", do you mean Agent A and B, or Collector A and B?

          For agent side first: The collector knows which chunk is written where in the file, and it tells each agent where in the file its data will be.

          For the collector side: There's exactly one collector per sink file, and when an agent does a post, the collector tells it the filename to track.

          Alright. I take your point that it's a mistake to have every collector doing an LS. I can redesign so that only a handful of collectors are scanning the filesystem – reducing the load to an acceptable level. Would that address your concerns?

          Show
          Ari Rabkin added a comment - When you say "server a" and "sever b", do you mean Agent A and B, or Collector A and B? For agent side first: The collector knows which chunk is written where in the file, and it tells each agent where in the file its data will be. For the collector side: There's exactly one collector per sink file, and when an agent does a post, the collector tells it the filename to track. Alright. I take your point that it's a mistake to have every collector doing an LS. I can redesign so that only a handful of collectors are scanning the filesystem – reducing the load to an acceptable level. Would that address your concerns?
          Hide
          Ari Rabkin added a comment -

          OK. I've now modified AsycAckSender so that it can take a separate list of collectors that should be used for checking file lengths.

          But I just realized there are two deeper problems with my approach.

          1) Suppose that an Ack doesn't arrive. What then? The code to rewind adaptors to the last checkpoint and resume hasn't been written yet. But I think it's pretty straightforward.
          2) It's possible that an agent writes chunks 1,2 and 3 to collector A. And then fails over to collector B and writes chunks 4 and 5. Supposing we get Acks for 1,2,4,5. The right thing to do is to apply the acks for 1+2, hold the acks for 4 and 5, and then if the timeout occurs, to restart from 3. But right now, we just assume that an ack for chunk n+1 implies that chunks 0-n have all committed. This isn't really right.

          There's two plausible fixes. The first is to automatically reset each running adaptor whenever we switch collectors. This makes (2) very easy to solve, at the expense of making dynamic load-balancing harder. The second is to use timeouts, and to really confront (2) head-on.

          Show
          Ari Rabkin added a comment - OK. I've now modified AsycAckSender so that it can take a separate list of collectors that should be used for checking file lengths. But I just realized there are two deeper problems with my approach. 1) Suppose that an Ack doesn't arrive. What then? The code to rewind adaptors to the last checkpoint and resume hasn't been written yet. But I think it's pretty straightforward. 2) It's possible that an agent writes chunks 1,2 and 3 to collector A. And then fails over to collector B and writes chunks 4 and 5. Supposing we get Acks for 1,2,4,5. The right thing to do is to apply the acks for 1+2, hold the acks for 4 and 5, and then if the timeout occurs, to restart from 3. But right now, we just assume that an ack for chunk n+1 implies that chunks 0-n have all committed. This isn't really right. There's two plausible fixes. The first is to automatically reset each running adaptor whenever we switch collectors. This makes (2) very easy to solve, at the expense of making dynamic load-balancing harder. The second is to use timeouts, and to really confront (2) head-on.
          Hide
          Ari Rabkin added a comment -

          Split some of the refactorings out as their own issue.

          Show
          Ari Rabkin added a comment - Split some of the refactorings out as their own issue.
          Hide
          Ari Rabkin added a comment -

          Tested latest version of this code on EC2. 128 agents, 5 collectors. Collectors were IO-bound, and able to saturate HDFS. No data lost when I killed two of the collectors.

          Advice on what would be a good test scenario?

          Show
          Ari Rabkin added a comment - Tested latest version of this code on EC2. 128 agents, 5 collectors. Collectors were IO-bound, and able to saturate HDFS. No data lost when I killed two of the collectors. Advice on what would be a good test scenario?
          Hide
          Eric Yang added a comment -

          Could you count # of duplicated chunks that has been sent?

          Show
          Eric Yang added a comment - Could you count # of duplicated chunks that has been sent?
          Hide
          Ari Rabkin added a comment -

          Yes; I'm measuring that. Chunks only get duplicated when a collector crashes; the number of duplicate chunks is basically just the amount of data lost in .chukwa files. So for a single collector failure, it's (write rate) * (period between rotations). This means that the fraction of duplicate data is just (period between rotations) / (mean time between failures)

          So if you assume that collectors crash once a week on average, and that the rotation rate is every five minutes, then the fraction of duplicate data is 0.05%.

          And my measurements bear this out.

          Show
          Ari Rabkin added a comment - Yes; I'm measuring that. Chunks only get duplicated when a collector crashes; the number of duplicate chunks is basically just the amount of data lost in .chukwa files. So for a single collector failure, it's (write rate) * (period between rotations). This means that the fraction of duplicate data is just (period between rotations) / (mean time between failures) So if you assume that collectors crash once a week on average, and that the rotation rate is every five minutes, then the fraction of duplicate data is 0.05%. And my measurements bear this out.
          Hide
          Jerome Boulon added a comment -

          Ari, I've been busy doing something else but I want to ask some questions before someone gives an eventual +1, in the mean time I will review your patch:

          • Do you need to get your ack from the same collector you have sent the data to?
          • What happen when the collector rotate?
          • What happen if someone put a real HTTP load balancer in front of collector?
          • Do we have a long running connection between agent and collector?
          Show
          Jerome Boulon added a comment - Ari, I've been busy doing something else but I want to ask some questions before someone gives an eventual +1, in the mean time I will review your patch: Do you need to get your ack from the same collector you have sent the data to? What happen when the collector rotate? What happen if someone put a real HTTP load balancer in front of collector? Do we have a long running connection between agent and collector?
          Hide
          Ari Rabkin added a comment -
          • You do not need to get an acknowledgment from the same collector you sent to. The "ack" is really just a confirmation that the file in question rotated OK, and was a sufficient length when it rotated.
          • Collectors don't need to do anything special on rotation
          • There's no long-running TCP connection between agent and collector. But my current implementation does assume that an agent will continue to use a single collector until it gets an IOException. For now, I'm not using timeouts; instead, it relies on getting an IOException from a down collector. This is simpler, but would require modification if we started doing dynamic load-balancing across collectors.
          Show
          Ari Rabkin added a comment - You do not need to get an acknowledgment from the same collector you sent to. The "ack" is really just a confirmation that the file in question rotated OK, and was a sufficient length when it rotated. Collectors don't need to do anything special on rotation There's no long-running TCP connection between agent and collector. But my current implementation does assume that an agent will continue to use a single collector until it gets an IOException. For now, I'm not using timeouts; instead, it relies on getting an IOException from a down collector. This is simpler, but would require modification if we started doing dynamic load-balancing across collectors.
          Hide
          Ari Rabkin added a comment -

          Latest version of code.

          Show
          Ari Rabkin added a comment - Latest version of code.
          Hide
          Ari Rabkin added a comment -

          I've now tested this fairly extensively, at data rates up to 200 MB/sec, up to 256 agents and 20 collectors. It's looking very good and I want to commit it.

          • Substantial tests are included.
          • The asynch ack mechanism is controlled by a conf option, and defaults to off. So if you're hesitant about it, you don't need to use it and everything should remain the way it was.
          • Even if it's turned on, collectors can still respond with an immediate Ack, if they happen to write synchronously. (E.g., if the collector is writing to HBase or local filesystem)
          • I tried pretty hard to code this in such a way that we can easily evolve and adapt the code to support other reliability strategies in the future.
          Show
          Ari Rabkin added a comment - I've now tested this fairly extensively, at data rates up to 200 MB/sec, up to 256 agents and 20 collectors. It's looking very good and I want to commit it. Substantial tests are included. The asynch ack mechanism is controlled by a conf option, and defaults to off. So if you're hesitant about it, you don't need to use it and everything should remain the way it was. Even if it's turned on, collectors can still respond with an immediate Ack, if they happen to write synchronously. (E.g., if the collector is writing to HBase or local filesystem) I tried pretty hard to code this in such a way that we can easily evolve and adapt the code to support other reliability strategies in the future.
          Hide
          Ari Rabkin added a comment -

          [And yes, this was intended as a call for a vote on committing]

          Show
          Ari Rabkin added a comment - [And yes, this was intended as a call for a vote on committing]
          Hide
          Eric Yang added a comment -

          +1 I don't find any reason not to include this.

          Show
          Eric Yang added a comment - +1 I don't find any reason not to include this.
          Hide
          Ari Rabkin added a comment -

          Passes unit tests here, and I'm committing.

          Show
          Ari Rabkin added a comment - Passes unit tests here, and I'm committing.
          Hide
          Hudson added a comment -

          Integrated in Chukwa-trunk #130 (See http://hudson.zones.apache.org/hudson/job/Chukwa-trunk/130/)
          . Tolerance of collector failures.

          Show
          Hudson added a comment - Integrated in Chukwa-trunk #130 (See http://hudson.zones.apache.org/hudson/job/Chukwa-trunk/130/ ) . Tolerance of collector failures.

            People

            • Assignee:
              Ari Rabkin
              Reporter:
              Ari Rabkin
            • Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development