Hadoop HDFS
  1. Hadoop HDFS
  2. HDFS-895

Allow hflush/sync to occur in parallel with new writes to the file

    Details

    • Hadoop Flags:
      Reviewed

      Description

      In the current trunk, the HDFS client methods writeChunk() and hflush./sync are syncronized. This means that if a hflush/sync is in progress, an applicationn cannot write data to the HDFS client buffer. This reduces the write throughput of the transaction log in HBase.

      The hflush/sync should allow new writes to happen to the HDFS client even when a hflush/sync is in progress. It can record the seqno of the message for which it should receice the ack, indicate to the DataStream thread to star flushing those messages, exit the synchronized section and just wai for that ack to arrive.

      1. ASF.LICENSE.NOT.GRANTED--hdfs-895.txt
        15 kB
        Todd Lipcon
      2. ASF.LICENSE.NOT.GRANTED--hdfs-895-trunk.txt
        19 kB
        Todd Lipcon
      3. hdfs-895-20.txt
        16 kB
        Todd Lipcon
      4. hdfs-895-0.20-append.txt
        16 kB
        Todd Lipcon
      5. hdfs-895-review.txt
        20 kB
        Todd Lipcon
      6. hdfs-895.txt
        17 kB
        Todd Lipcon
      7. 895-delta-for-review.txt
        7 kB
        Todd Lipcon
      8. hdfs-895.txt
        21 kB
        Todd Lipcon
      9. hdfs-895-branch-20-append.txt
        17 kB
        Todd Lipcon
      10. hdfs-895-ontopof-1497.txt
        23 kB
        Todd Lipcon
      11. hdfs-895.txt
        22 kB
        Todd Lipcon
      12. hdfs-895.txt
        22 kB
        Todd Lipcon
      13. hdfs-895.txt
        21 kB
        Todd Lipcon
      14. hdfs-895.txt
        22 kB
        Todd Lipcon
      15. hdfs-895-0.20-append.txt
        19 kB
        Todd Lipcon
      16. HDFS-895.20-security.1.patch
        18 kB
        Jitendra Nath Pandey

        Issue Links

          Activity

          Hide
          Matt Foley added a comment -

          Closed upon release of 0.20.205.0

          Show
          Matt Foley added a comment - Closed upon release of 0.20.205.0
          Hide
          Suresh Srinivas added a comment -

          I committed the patch to 0.20-security.

          Show
          Suresh Srinivas added a comment - I committed the patch to 0.20-security.
          Hide
          Suresh Srinivas added a comment -

          +1 for the patch.

          Show
          Suresh Srinivas added a comment - +1 for the patch.
          Hide
          Jitendra Nath Pandey added a comment -

          Patch uploaded for 20-security.

          Show
          Jitendra Nath Pandey added a comment - Patch uploaded for 20-security.
          Hide
          Hairong Kuang added a comment -

          I've committed this. Thanks, Todd!

          Show
          Hairong Kuang added a comment - I've committed this. Thanks, Todd!
          Hide
          Todd Lipcon added a comment -

          Stack requested a patch for 0.20-append. This one ought to work, but I haven't done testing aside from running the new unit test. It's based on the patch from CDH3 which has been deployed and tested at scale, but slightly modified based on the more recent changes to the trunk version.

          Show
          Todd Lipcon added a comment - Stack requested a patch for 0.20-append. This one ought to work, but I haven't done testing aside from running the new unit test. It's based on the patch from CDH3 which has been deployed and tested at scale, but slightly modified based on the more recent changes to the trunk version.
          Hide
          stack added a comment -

          @Todd Do we need a refresher on this patch for 0.20-append? Looks like you fixed up a few things subsequent to your last 0.20-append version. hdfs-724 has gone in as well as the fixup for the misapplication of the backport to 0.20-append and seems to be working properly. Thanks.

          Show
          stack added a comment - @Todd Do we need a refresher on this patch for 0.20-append? Looks like you fixed up a few things subsequent to your last 0.20-append version. hdfs-724 has gone in as well as the fixup for the misapplication of the backport to 0.20-append and seems to be working properly. Thanks.
          Hide
          Todd Lipcon added a comment -

          Yes, we should get this into 20-append for HBase. Right now there seems to be an issue with the HDFS-724 patch in 20-append, and since these touch very similar areas of the write pipeline, I want to either temporarily revert 724 from 20-append, or figure out what's wrong with it. No sense adding another variable into the mix when our current branch has some problems.

          Show
          Todd Lipcon added a comment - Yes, we should get this into 20-append for HBase. Right now there seems to be an issue with the HDFS-724 patch in 20-append, and since these touch very similar areas of the write pipeline, I want to either temporarily revert 724 from 20-append, or figure out what's wrong with it. No sense adding another variable into the mix when our current branch has some problems.
          Hide
          dhruba borthakur added a comment -

          Yes, we could get this to 0.20-append too. Thanks Hairong.

          Show
          dhruba borthakur added a comment - Yes, we could get this to 0.20-append too. Thanks Hairong.
          Hide
          Hairong Kuang added a comment -

          Good catch, Todd!

          I've just committed this to trunk. Do we also need to get it in 0.20 append?

          Show
          Hairong Kuang added a comment - Good catch, Todd! I've just committed this to trunk. Do we also need to get it in 0.20 append?
          Hide
          Todd Lipcon added a comment -

          With this latest patch, the following tests fali:
          [junit] Test org.apache.hadoop.hdfs.TestFileStatus FAILED [ HDFS-1470 ]
          [junit] Test org.apache.hadoop.hdfs.TestHDFSTrash FAILED (timeout) [ HDFS-1471 ]
          [junit] Test org.apache.hadoop.hdfs.server.namenode.TestStorageRestore FAILED [ HDFS-1496 ]
          [junit] Test org.apache.hadoop.fs.TestHDFSFileContextMainOperations FAILED [ HDFS-874 ]
          [junit] Test org.apache.hadoop.hdfs.TestPipelines FAILED [ HDFS-1467 ]
          [junit] Test org.apache.hadoop.hdfs.tools.offlineImageViewer.TestOfflineImageViewer FAILED [ HDFS-1500 ]
          All of these also fail in trunk - I put the relevant JIRAs next to them above. So, no new failures caused by this patch.

          Test-patch results:

          [exec] -1 overall.
          [exec]
          [exec] +1 @author. The patch does not contain any @author tags.
          [exec]
          [exec] +1 tests included. The patch appears to include 5 new or modified tests.
          [exec]
          [exec] +1 javadoc. The javadoc tool did not generate any warning messages.
          [exec]
          [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.
          [exec]
          [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings.
          [exec]
          [exec] -1 release audit. The applied patch generated 97 release audit warnings (more than the trunk's current 1 warnings).
          [exec]
          [exec] +1 system test framework. The patch passed system test framework compile.

          The release audit is incorrect - same is true of an empty patch on HDFS trunk (known issue)

          Show
          Todd Lipcon added a comment - With this latest patch, the following tests fali: [junit] Test org.apache.hadoop.hdfs.TestFileStatus FAILED [ HDFS-1470 ] [junit] Test org.apache.hadoop.hdfs.TestHDFSTrash FAILED (timeout) [ HDFS-1471 ] [junit] Test org.apache.hadoop.hdfs.server.namenode.TestStorageRestore FAILED [ HDFS-1496 ] [junit] Test org.apache.hadoop.fs.TestHDFSFileContextMainOperations FAILED [ HDFS-874 ] [junit] Test org.apache.hadoop.hdfs.TestPipelines FAILED [ HDFS-1467 ] [junit] Test org.apache.hadoop.hdfs.tools.offlineImageViewer.TestOfflineImageViewer FAILED [ HDFS-1500 ] All of these also fail in trunk - I put the relevant JIRAs next to them above. So, no new failures caused by this patch. Test-patch results: [exec] -1 overall. [exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] +1 tests included. The patch appears to include 5 new or modified tests. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] [exec] -1 release audit. The applied patch generated 97 release audit warnings (more than the trunk's current 1 warnings). [exec] [exec] +1 system test framework. The patch passed system test framework compile. The release audit is incorrect - same is true of an empty patch on HDFS trunk (known issue)
          Hide
          Todd Lipcon added a comment -

          I ran the unit tests and caught one more bug that caused TestFiPipelineClose to fail. The issue was that if faliure recovery happens in PIPELINE_CLOSE stage, the "last packet in block" packet gets removed from dataQueue after the close() caller is already waiting for that sequence number. Thus the sequence number never comes and the caller of close() hangs. The fix is to set lastAckedSeqNo to the lastPacketInBlock seqno when it is removed from dataQueue. I also added some asserts in this code path.

          Show
          Todd Lipcon added a comment - I ran the unit tests and caught one more bug that caused TestFiPipelineClose to fail. The issue was that if faliure recovery happens in PIPELINE_CLOSE stage, the "last packet in block" packet gets removed from dataQueue after the close() caller is already waiting for that sequence number. Thus the sequence number never comes and the caller of close() hangs. The fix is to set lastAckedSeqNo to the lastPacketInBlock seqno when it is removed from dataQueue. I also added some asserts in this code path.
          Hide
          Hairong Kuang added a comment -

          +1. This looks good to me except that the following unnecessary import.
          import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;

          Could you please post ant patch & test results?

          Show
          Hairong Kuang added a comment - +1. This looks good to me except that the following unnecessary import. import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; Could you please post ant patch & test results?
          Hide
          Todd Lipcon added a comment -

          Fixed the queueCurrentPacket to waitAndQueueCurrentPacket()

          Also got rid of the hack for LeaseExpiredException, since we decided above that concurrent close and hflush() should be an IOException anyway.

          Show
          Todd Lipcon added a comment - Fixed the queueCurrentPacket to waitAndQueueCurrentPacket() Also got rid of the hack for LeaseExpiredException, since we decided above that concurrent close and hflush() should be an IOException anyway.
          Hide
          Hairong Kuang added a comment -

          The patch looks good. Two minor comments:
          1. line 1291: queueCurrentPacket should be waitAndQueueCurrentPacket
          2. I am a little uncomfortable about how LeaseExpirationExeception is handled. LeaseExpirationException does not always mean that the file is closed by the client. It may indicate that the client does not renew the lease and so the lease is really expired. But I guess eventually the client will find out when closing or getting next block. Could you please update the comment just for information.

          Please update the patch then post "ant patch" result and "ant test" result.

          Show
          Hairong Kuang added a comment - The patch looks good. Two minor comments: 1. line 1291: queueCurrentPacket should be waitAndQueueCurrentPacket 2. I am a little uncomfortable about how LeaseExpirationExeception is handled. LeaseExpirationException does not always mean that the file is closed by the client. It may indicate that the client does not renew the lease and so the lease is really expired. But I guess eventually the client will find out when closing or getting next block. Could you please update the comment just for information. Please update the patch then post "ant patch" result and "ant test" result.
          Hide
          Todd Lipcon added a comment -

          Previous patch had two javac warnings for using the deprecated MiniDFSCluster constructor. New patch just fixes those:

          < MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
          > MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();

          Show
          Todd Lipcon added a comment - Previous patch had two javac warnings for using the deprecated MiniDFSCluster constructor. New patch just fixes those: < MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); > MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
          Hide
          Todd Lipcon added a comment -

          OK, here is a patch that applies against just trunk.

          Show
          Todd Lipcon added a comment - OK, here is a patch that applies against just trunk.
          Hide
          Hairong Kuang added a comment -

          Todd, please produce a patch not on top of HDFS-1497.

          Show
          Hairong Kuang added a comment - Todd, please produce a patch not on top of HDFS-1497 .
          Hide
          Todd Lipcon added a comment -

          I took out the bug fix for sequence number skip into a patch on HDFS-1497. This new patch, hdfs-895-ontopof-1497.txt applies on top of HDFS-1497 and adds the parallel flush capability. I think we should commit that bug fix first and then add parallel flush - the safety checks from 1497 make me more confident that we won't accidentally break this tricky code.

          Show
          Todd Lipcon added a comment - I took out the bug fix for sequence number skip into a patch on HDFS-1497 . This new patch, hdfs-895-ontopof-1497.txt applies on top of HDFS-1497 and adds the parallel flush capability. I think we should commit that bug fix first and then add parallel flush - the safety checks from 1497 make me more confident that we won't accidentally break this tricky code.
          Hide
          Hairong Kuang added a comment -

          > Would it be better to open a very small JIRA to add the assert and fix for it, commit that, then commit this as an optimization? That would keep the two changes orthogonal and maybe easier to understand?

          The assertion is a nice thing to have. Please open a jira for adding this assertion. The ideal fix is not to create the packet. Let's focus this jira on parallel hflush, which is what hbase really needs.

          Show
          Hairong Kuang added a comment - > Would it be better to open a very small JIRA to add the assert and fix for it, commit that, then commit this as an optimization? That would keep the two changes orthogonal and maybe easier to understand? The assertion is a nice thing to have. Please open a jira for adding this assertion. The ideal fix is not to create the packet. Let's focus this jira on parallel hflush, which is what hbase really needs.
          Hide
          Todd Lipcon added a comment -

          Stack requested a provisional patch for branch-20-append. Here it is, not for commit until the discussion on trunk is resolved.

          Show
          Todd Lipcon added a comment - Stack requested a provisional patch for branch-20-append. Here it is, not for commit until the discussion on trunk is resolved.
          Hide
          Todd Lipcon added a comment -

          I still have a question, if lastFlushOffset == bytesCurBlock, when will this condition to be true: oldCurrentPacket != null && currentPacket != null?

          I don't think that will ever be true. We do get the case oldCurrentPacket == null && currentPacket == null though when we call flush twice at the beginning of any block. So I think we can add an assert assert oldCurrentPacket == null in that else clause.

          Please understand I did not mean to give you a hard time

          No worries - I agree that this code is very tricky, which is why I'd like to keep the asserts at this point. The assert guards what we all thought was an invariant: sequence numbers should increase by exactly one with every packet. Nicolas also reviewed this code in depth a few months back, which is when we added this new currentSeqno-- bit. If I recall correctly we discussed a lot whether there was any bug where we could skip or repeat a sequence number, and when we added the assert for in-order no-skipping sequence numbers, we found this bug.

          Would it be better to open a very small JIRA to add the assert and fix for it, commit that, then commit this as an optimization? That would keep the two changes orthogonal and maybe easier to understand?

          Show
          Todd Lipcon added a comment - I still have a question, if lastFlushOffset == bytesCurBlock, when will this condition to be true: oldCurrentPacket != null && currentPacket != null? I don't think that will ever be true. We do get the case oldCurrentPacket == null && currentPacket == null though when we call flush twice at the beginning of any block. So I think we can add an assert assert oldCurrentPacket == null in that else clause. Please understand I did not mean to give you a hard time No worries - I agree that this code is very tricky, which is why I'd like to keep the asserts at this point. The assert guards what we all thought was an invariant: sequence numbers should increase by exactly one with every packet. Nicolas also reviewed this code in depth a few months back, which is when we added this new currentSeqno-- bit. If I recall correctly we discussed a lot whether there was any bug where we could skip or repeat a sequence number, and when we added the assert for in-order no-skipping sequence numbers, we found this bug. Would it be better to open a very small JIRA to add the assert and fix for it, commit that, then commit this as an optimization? That would keep the two changes orthogonal and maybe easier to understand?
          Hide
          Hairong Kuang added a comment -

          OK I see. So this piece of code is only for making the assertion work. I still have a question, if lastFlushOffset == bytesCurBlock, when will this condition to be true: oldCurrentPacket != null && currentPacket != null?

          Please understand I did not mean to give you a hard time. I really think this seq# change is unrelated to this issue and unnecessary. It is simpler and less error prone just removing it together with the assertion. The HDFS pipeline side code is very complicated and is hard to get it right. I would prefer not to make any change unless necessary.

          Show
          Hairong Kuang added a comment - OK I see. So this piece of code is only for making the assertion work. I still have a question, if lastFlushOffset == bytesCurBlock, when will this condition to be true: oldCurrentPacket != null && currentPacket != null? Please understand I did not mean to give you a hard time. I really think this seq# change is unrelated to this issue and unnecessary. It is simpler and less error prone just removing it together with the assertion. The HDFS pipeline side code is very complicated and is hard to get it right. I would prefer not to make any change unless necessary.
          Hide
          Todd Lipcon added a comment -

          Hey Hairong. I had actually recalled incorrectly which part of that confusing code is new - only the "currentSeqno--" code is new, to prevent skipping a sequence number. Here's a diff that ignores whitespace change:

                 // Flush only if we haven't already flushed till this offset.
                 if (lastFlushOffset != bytesCurBlock) {
          -
          +          assert bytesCurBlock > lastFlushOffset;
                   // record the valid offset of this flush
                   lastFlushOffset = bytesCurBlock;
          -
          -        // wait for all packets to be sent and acknowledged
          -        flushInternal();
          +          queueCurrentPacket();
                 } else {
                   // just discard the current packet since it is already been sent.
          +          if (oldCurrentPacket == null && currentPacket != null) {
          +            // If we didn't previously have a packet queued, and now we do,
          +            // but we don't plan on sending it, then we should not
          +            // skip a sequence number for it!
          +            currentSeqno--;
          +          }
                   currentPacket = null;
                 }
          

          As you can see we already had the code that avoided duplicate packets.

          Show
          Todd Lipcon added a comment - Hey Hairong. I had actually recalled incorrectly which part of that confusing code is new - only the "currentSeqno--" code is new, to prevent skipping a sequence number. Here's a diff that ignores whitespace change: // Flush only if we haven't already flushed till this offset. if (lastFlushOffset != bytesCurBlock) { - + assert bytesCurBlock > lastFlushOffset; // record the valid offset of this flush lastFlushOffset = bytesCurBlock; - - // wait for all packets to be sent and acknowledged - flushInternal(); + queueCurrentPacket(); } else { // just discard the current packet since it is already been sent. + if (oldCurrentPacket == null && currentPacket != null ) { + // If we didn't previously have a packet queued, and now we do , + // but we don't plan on sending it, then we should not + // skip a sequence number for it! + currentSeqno--; + } currentPacket = null ; } As you can see we already had the code that avoided duplicate packets.
          Hide
          stack added a comment -

          @Hairong I believe Todd is referring to the this patch being run in production here at SU for last 3 months as well as whatever deploys there are atop CDHs that have this patch applied.. not 0.22.

          Show
          stack added a comment - @Hairong I believe Todd is referring to the this patch being run in production here at SU for last 3 months as well as whatever deploys there are atop CDHs that have this patch applied.. not 0.22.
          Hide
          Hairong Kuang added a comment -

          0.22 in production?

          I would prefer not to get the confusing code in especially this code is not related to this issue. Because once the code is in, it is very hard to get it out especially when you work in an open community.

          Show
          Hairong Kuang added a comment - 0.22 in production? I would prefer not to get the confusing code in especially this code is not related to this issue. Because once the code is in, it is very hard to get it out especially when you work in an open community.
          Hide
          Todd Lipcon added a comment -

          Ah, I see what you're saying... I think that would work in theory, but given we've had a lot of production testing of the patch as is, I'm a little nervous to make that change at this point and lose some of that confidence.

          Show
          Todd Lipcon added a comment - Ah, I see what you're saying... I think that would work in theory, but given we've had a lot of production testing of the patch as is, I'm a little nervous to make that change at this point and lose some of that confidence.
          Hide
          Hairong Kuang added a comment -

          > but it does trigger an assertion if assertions are enabled - a sequence number will get "skipped".
          I do not understand how this would happen if the duplicate packet also gets sent to the pipeline. Did I miss anything?

          Show
          Hairong Kuang added a comment - > but it does trigger an assertion if assertions are enabled - a sequence number will get "skipped". I do not understand how this would happen if the duplicate packet also gets sent to the pipeline. Did I miss anything?
          Hide
          Todd Lipcon added a comment -

          Could this patch go without this fix? I would prefer to have a different jira to improve hflush without data. I think I filed a jira a while back.

          I agree that that bit of code is hard to understand. It also "works fine" without the fix, but it does trigger an assertion if assertions are enabled – a sequence number will get "skipped". So I would prefer to keep the fix in, and in the JIRA you mentioned (avoid creating a packet in the first place for empty flush) we can hopefully get rid of the confusing code. Is that alright?

          Show
          Todd Lipcon added a comment - Could this patch go without this fix? I would prefer to have a different jira to improve hflush without data. I think I filed a jira a while back. I agree that that bit of code is hard to understand. It also "works fine" without the fix, but it does trigger an assertion if assertions are enabled – a sequence number will get "skipped". So I would prefer to keep the fix in, and in the JIRA you mentioned (avoid creating a packet in the first place for empty flush) we can hopefully get rid of the confusing code. Is that alright?
          Hide
          Hairong Kuang added a comment -

          > we call hflush() again without writing any more data
          I see that you are trying to improve the case when flushing twice without writing any data. I am still a little bit uncomfortable with this fix. ( feel that it is hard to understand and maintain..Ideally we should not create a packet in this case.)

          Could this patch go without this fix? I would prefer to have a different jira to improve hflush without data. I think I filed a jira a while back.

          Show
          Hairong Kuang added a comment - > we call hflush() again without writing any more data I see that you are trying to improve the case when flushing twice without writing any data. I am still a little bit uncomfortable with this fix. ( feel that it is hard to understand and maintain..Ideally we should not create a packet in this case.) Could this patch go without this fix? I would prefer to have a different jira to improve hflush without data. I think I filed a jira a while back.
          Hide
          Nicolas Spiegelberg added a comment -

          +1 peer reviewed this. looks pretty solid.

          Show
          Nicolas Spiegelberg added a comment - +1 peer reviewed this. looks pretty solid.
          Hide
          dhruba borthakur added a comment -

          > - flushing a stream that's been closed throws IOE.

          I am not sure why the Java flush API is successful even if the steam is closed. But I would vote for the above semantics, thanks Todd.

          Show
          dhruba borthakur added a comment - > - flushing a stream that's been closed throws IOE. I am not sure why the Java flush API is successful even if the steam is closed. But I would vote for the above semantics, thanks Todd.
          Hide
          Todd Lipcon added a comment -

          Here's full patch against trunk

          Show
          Todd Lipcon added a comment - Here's full patch against trunk
          Hide
          Todd Lipcon added a comment -

          Here's a delta that shows the patch for the NPE issue for easy review. I'll also upload a full patch against trunk momentarily but figured it would be easier to look at just the changed bits.

          Although I think either interpretation of hflush() could make sense, I decided to leave it as it currently is - flushing a stream that's been closed throws IOE. If we want to change this in the future we can do it in a different JIRA rather than conflating a semantic change with this optimization.

          Show
          Todd Lipcon added a comment - Here's a delta that shows the patch for the NPE issue for easy review. I'll also upload a full patch against trunk momentarily but figured it would be easier to look at just the changed bits. Although I think either interpretation of hflush() could make sense, I decided to leave it as it currently is - flushing a stream that's been closed throws IOE. If we want to change this in the future we can do it in a different JIRA rather than conflating a semantic change with this optimization.
          Hide
          Hairong Kuang added a comment -

          Checked a few OutputStream in Java. It seems that they implement flush as noop if the stream is closed. So I am OK if DFSOutputStream does the same.

          Show
          Hairong Kuang added a comment - Checked a few OutputStream in Java. It seems that they implement flush as noop if the stream is closed. So I am OK if DFSOutputStream does the same.
          Hide
          Hairong Kuang added a comment -

          I feel that hflush() should throw an exception if the stream is already closed. This is a pretty standard semantics.

          Show
          Hairong Kuang added a comment - I feel that hflush() should throw an exception if the stream is already closed. This is a pretty standard semantics.
          Hide
          Todd Lipcon added a comment -

          The bug JD found is an NPE that happens if close() is called concurrent with hflush(). I have a patch that fixes this to IOE, but Nicolas and I have been discussing whether it should be a no-op instead. The logic is that if you append something, then some other thread close()s, then you call hflush(), your data has indeed already been flushed (ie is on disk). Right now hflush() checks that the stream is open first, but instead should it just return if the stream was closed in a non-error state?

          Show
          Todd Lipcon added a comment - The bug JD found is an NPE that happens if close() is called concurrent with hflush(). I have a patch that fixes this to IOE, but Nicolas and I have been discussing whether it should be a no-op instead. The logic is that if you append something, then some other thread close()s, then you call hflush(), your data has indeed already been flushed (ie is on disk). Right now hflush() checks that the stream is open first, but instead should it just return if the stream was closed in a non-error state?
          Hide
          Todd Lipcon added a comment -

          Thanks to JD who reminded me of a small bug we had fixed on the 20 version of this patch that didn't make it into the trunk patch. Working on trunk patch and unit test now.

          Show
          Todd Lipcon added a comment - Thanks to JD who reminded me of a small bug we had fixed on the 20 version of this patch that didn't make it into the trunk patch. Working on trunk patch and unit test now.
          Hide
          Todd Lipcon added a comment -

          Thanks for the review.

          Does this work with the heartbeat packet?

          Line 657-658 check for the heartbeat sequence number before we set lastAckedSeqno in ResponseProcessor.run(), so it should work as before.

          dataQueue.wait(1000); is it possible to use dataQueue.wait();

          Yes, I think that would also work. In theory, the timeout isn't necessary, but I've seen bugs before where a slight race causes us to miss a notify. Perhaps we can switch the synchronized (dataQueue) and the while (!closed) in this function and avoid the race? It seems to me that waking up once a second just to be extra safe doesn't really harm us, but maybe it's a bit of a band-aid.

          lines 1294-1299: when would this happen?

          This is a bit subtle - it was one of the bugs in the original version of the patch. Here's the sequence of events that it covers:

          • write 10 bytes to a new file (ie no packet yet)
          • call hflush()
            • it calls flushBuffer, so that it enqueues a new "packet"
            • it sends that packet - now we have no currentPacket, and the 10 bytes are still in the checksum buffer, lastFlushOffset=10
          • we call hflush() again without writing any more data
            • it calls flushBuffer, so it creates a new "packet" with the same 10 bytes
            • we notice that the bytesCurBlock is the same as lastFlushOffset, hence we don't want to actually re-send this packet (there's no new data, so it's a no-op flush)
            • hence, we need to get rid of the packet and also decrement the sequence number so we don't "skip" one

          Without this fix, we were triggering 'assert seqno == lastAckedSeqno + 1;' when calling hflush() twice without writing any data in between

          Show
          Todd Lipcon added a comment - Thanks for the review. Does this work with the heartbeat packet? Line 657-658 check for the heartbeat sequence number before we set lastAckedSeqno in ResponseProcessor.run(), so it should work as before. dataQueue.wait(1000); is it possible to use dataQueue.wait(); Yes, I think that would also work. In theory, the timeout isn't necessary, but I've seen bugs before where a slight race causes us to miss a notify. Perhaps we can switch the synchronized (dataQueue) and the while (!closed) in this function and avoid the race? It seems to me that waking up once a second just to be extra safe doesn't really harm us, but maybe it's a bit of a band-aid. lines 1294-1299: when would this happen? This is a bit subtle - it was one of the bugs in the original version of the patch. Here's the sequence of events that it covers: write 10 bytes to a new file (ie no packet yet) call hflush() it calls flushBuffer, so that it enqueues a new "packet" it sends that packet - now we have no currentPacket, and the 10 bytes are still in the checksum buffer, lastFlushOffset=10 we call hflush() again without writing any more data it calls flushBuffer, so it creates a new "packet" with the same 10 bytes we notice that the bytesCurBlock is the same as lastFlushOffset, hence we don't want to actually re-send this packet (there's no new data, so it's a no-op flush) hence, we need to get rid of the packet and also decrement the sequence number so we don't "skip" one Without this fix, we were triggering 'assert seqno == lastAckedSeqno + 1;' when calling hflush() twice without writing any data in between
          Hide
          Hairong Kuang added a comment -

          The patch looks good. A few questions:
          1. Does this work with the heartbeat packet?
          2. line 1387: dataQueue.wait(1000); is it possible to use dataQueue.wait();
          3. lines 1294-1299: when would this happen?
          if (oldCurrentPacket == null && currentPacket != null)

          { // If we didn't previously have a packet queued, and now we do, // but we don't plan on sending it, then we should not // skip a sequence number for it! currentSeqno--; }
          Show
          Hairong Kuang added a comment - The patch looks good. A few questions: 1. Does this work with the heartbeat packet? 2. line 1387: dataQueue.wait(1000); is it possible to use dataQueue.wait(); 3. lines 1294-1299: when would this happen? if (oldCurrentPacket == null && currentPacket != null) { // If we didn't previously have a packet queued, and now we do, // but we don't plan on sending it, then we should not // skip a sequence number for it! currentSeqno--; }
          Hide
          Todd Lipcon added a comment -

          Has anyone taken a look at the patch for trunk? I think we wanted to hold off putting this in branch-20-append until it's in trunk. The latest trunk patch appears to still apply. Once that has been reviewed I'll re-do the branch-20 patch. [hello from Kyoto!]

          Show
          Todd Lipcon added a comment - Has anyone taken a look at the patch for trunk? I think we wanted to hold off putting this in branch-20-append until it's in trunk. The latest trunk patch appears to still apply. Once that has been reviewed I'll re-do the branch-20 patch. [hello from Kyoto!]
          Hide
          Hairong Kuang added a comment -

          Stack, looks that we have to wait until Todd is back. This one is performance issue. Theoretically the release could be cut without it.

          Show
          Hairong Kuang added a comment - Stack, looks that we have to wait until Todd is back. This one is performance issue. Theoretically the release could be cut without it.
          Hide
          stack added a comment -

          @Hairong He's out for a few days (wandering temples in foreign lands)

          Show
          stack added a comment - @Hairong He's out for a few days (wandering temples in foreign lands)
          Hide
          Hairong Kuang added a comment -

          Todd, could you please upload an updated patch for 0.20? Jonathan is asking me if I could commit this.

          Show
          Hairong Kuang added a comment - Todd, could you please upload an updated patch for 0.20? Jonathan is asking me if I could commit this.
          Hide
          Todd Lipcon added a comment -

          Here's the same patch as just one diff.

          FWIW, the 20-append patch posted here has been in our distro for a couple months with lots of people using the new feature through HBase with no issues. So I think it's pretty sound (and represents at least 25-30% improvement for HBase write throughput according to JD's comments in HBASE-2467)

          Show
          Todd Lipcon added a comment - Here's the same patch as just one diff. FWIW, the 20-append patch posted here has been in our distro for a couple months with lots of people using the new feature through HBase with no issues. So I think it's pretty sound (and represents at least 25-30% improvement for HBase write throughput according to JD's comments in HBASE-2467 )
          Hide
          Todd Lipcon added a comment -

          Took some time to update this to newest trunk, based on the fixes in the 20-append patch. This attachment hdfs-895-review.txt shows the patch broken up into three separate commits - first two refactors and then the actual parallel sync feature. It should be easier to understand the patch and review it this way. Will upload the total patch as well.

          Show
          Todd Lipcon added a comment - Took some time to update this to newest trunk, based on the fixes in the 20-append patch. This attachment hdfs-895-review.txt shows the patch broken up into three separate commits - first two refactors and then the actual parallel sync feature. It should be easier to understand the patch and review it this way. Will upload the total patch as well.
          Hide
          sam rash added a comment -

          re: the patch

          I realize in the hadoop code we already swallow InterruptedException frequently, but I think you can change the trend here:

          // wait for all acks to be received back from datanodes
                  synchronized (ackQueue) {
                    if (!closed && ackQueue.size() != 0) {
                      try {
                        ackQueue.wait();
                      } catch (InterruptedException e) {
                        Thread.currentThread.interrupt();  //add this 
                      }
                      continue;
                    }
                  }
          

          otherwise, it's very easy to have a thread that I own and manage that has a DFSOutputStream in it that swallows an interrupt. when i check Thread.currentThread.isInterrupted() to see if one of my other threads has interrupted me, i will not see it

          (the crux here is that swallowing interrupts in threads that hadoop controls are less harmful--this is directly in client code when you call sync()/close())

          Show
          sam rash added a comment - re: the patch I realize in the hadoop code we already swallow InterruptedException frequently, but I think you can change the trend here: // wait for all acks to be received back from datanodes synchronized (ackQueue) { if (!closed && ackQueue.size() != 0) { try { ackQueue.wait(); } catch (InterruptedException e) { Thread .currentThread.interrupt(); //add this } continue ; } } otherwise, it's very easy to have a thread that I own and manage that has a DFSOutputStream in it that swallows an interrupt. when i check Thread.currentThread.isInterrupted() to see if one of my other threads has interrupted me, i will not see it (the crux here is that swallowing interrupts in threads that hadoop controls are less harmful--this is directly in client code when you call sync()/close())
          Hide
          Todd Lipcon added a comment -

          It's not entirely alone - Linux these days (since 2.6.17) has sync_file_range(2) which is pretty similar

          Show
          Todd Lipcon added a comment - It's not entirely alone - Linux these days (since 2.6.17) has sync_file_range(2) which is pretty similar
          Hide
          dhruba borthakur added a comment -

          I like the idea of having a hflush(offset) type of call. It will be non-standard in the sense that I know of no other filesystems that has such a call, but it would benefit hbase a lot.

          Show
          dhruba borthakur added a comment - I like the idea of having a hflush(offset) type of call. It will be non-standard in the sense that I know of no other filesystems that has such a call, but it would benefit hbase a lot.
          Hide
          Todd Lipcon added a comment -

          new patch against 20 that I've been testing with HBase. Will post a new patch against trunk this week, having some timeouts with FI tests that I need to understand.

          Show
          Todd Lipcon added a comment - new patch against 20 that I've been testing with HBase. Will post a new patch against trunk this week, having some timeouts with FI tests that I need to understand.
          Hide
          Todd Lipcon added a comment -

          Found a couple bugs in this, so cancelling patch available for the time being while I work them out.

          Also, I'd like to propose an extension to the hflush() API: hflush(long fileOffset) would ensure that the file has been flushed up to a length >= fileOffset. We can do this pretty easily by examining the ackQueue and dataQueue - if we find a packet in either queue that contains the given offset, we wait for that seqnum to be acked. If we don't find such a packet, and the byte offset is smaller than the top of ackQueue, it's already been acked, and if it's larger than the top of dataQueue, we need to flush like we're doing now.

          This would be very useful to HBase, where the appends to the logs don't want to happen in a synchronized block with the flush. Each thread only cares about syncing up to its last write offset.

          Show
          Todd Lipcon added a comment - Found a couple bugs in this, so cancelling patch available for the time being while I work them out. Also, I'd like to propose an extension to the hflush() API: hflush(long fileOffset) would ensure that the file has been flushed up to a length >= fileOffset. We can do this pretty easily by examining the ackQueue and dataQueue - if we find a packet in either queue that contains the given offset, we wait for that seqnum to be acked. If we don't find such a packet, and the byte offset is smaller than the top of ackQueue, it's already been acked, and if it's larger than the top of dataQueue, we need to flush like we're doing now. This would be very useful to HBase, where the appends to the logs don't want to happen in a synchronized block with the flush. Each thread only cares about syncing up to its last write offset.
          Hide
          Todd Lipcon added a comment -

          Here's a patch against trunk. There are a couple TODOs in there with questions for reviewers. I haven't had a chance to run any benchmarks for the trunk version, but should be similar speedup.

          Show
          Todd Lipcon added a comment - Here's a patch against trunk. There are a couple TODOs in there with questions for reviewers. I haven't had a chance to run any benchmarks for the trunk version, but should be similar speedup.
          Hide
          Todd Lipcon added a comment -

          Here's a preliminary patch against 0.20 sync (will forward port it, but HBase on 20 makes a good testing ground). It could do with a thorough code review, as this is tricky code, but the general idea is simple enough. Also I want to augment the unit test to do some data verification.

          The included test case can also be run as a benchmark, where it runs 10 threads, each of which just appends 511-byte chunks and calls sync for each one. With the patched DFSClient, it runs in about 33 seconds on my test cluster. Without the patched DFSClient it took 290 seconds (and jstack shows most threads blocked most of the time). This is confirming that we expected - there's a lot of parallelism to be gained for multithreaded writers.

          Show
          Todd Lipcon added a comment - Here's a preliminary patch against 0.20 sync (will forward port it, but HBase on 20 makes a good testing ground). It could do with a thorough code review, as this is tricky code, but the general idea is simple enough. Also I want to augment the unit test to do some data verification. The included test case can also be run as a benchmark, where it runs 10 threads, each of which just appends 511-byte chunks and calls sync for each one. With the patched DFSClient, it runs in about 33 seconds on my test cluster. Without the patched DFSClient it took 290 seconds (and jstack shows most threads blocked most of the time). This is confirming that we expected - there's a lot of parallelism to be gained for multithreaded writers.
          Hide
          dhruba borthakur added a comment -

          Hi Todd, I am not working on this one yet and if you have an implementation that will be great.

          The HBase use-case is that one thread will be calling the hflush() on a file handle while many other threads could be trying to write concurrently to that same file handle.

          Show
          dhruba borthakur added a comment - Hi Todd, I am not working on this one yet and if you have an implementation that will be great. The HBase use-case is that one thread will be calling the hflush() on a file handle while many other threads could be trying to write concurrently to that same file handle.
          Hide
          Todd Lipcon added a comment -

          Anybody working on this? I'm interested in doing so, if not.

          Show
          Todd Lipcon added a comment - Anybody working on this? I'm interested in doing so, if not.
          Hide
          Joydeep Sen Sarma added a comment -

          very cool. thanks. i suspect the speedup would be more with higher number of clients - but this seals the deal.

          Show
          Joydeep Sen Sarma added a comment - very cool. thanks. i suspect the speedup would be more with higher number of clients - but this seals the deal.
          Hide
          Jean-Daniel Cryans added a comment -

          I did the experiment Joydeep described on 1 machine. I replaced the line where we call hflush in the write-ahead-log with:

                    if(now % 3 == 0)
                      Thread.sleep(1);
          

          Because first if I just slept for 1ms it was already 2-3 times slower then normal sync time, I guess it's because it's very hard for the JVM to schedule such a small sleep time. The "now" variable is a System.currentTimeInMillis called just before and used for other metrics.

          So with this modification a single client takes as much time inserting as with normal sync and 4 clients take almost the same time on average to insert a value. With sync and 4 clients, it takes twice the time to insert a single value.

          It would tend to confirm that the synchronization between append and sync costs a lot for multi-threaded clients.

          Show
          Jean-Daniel Cryans added a comment - I did the experiment Joydeep described on 1 machine. I replaced the line where we call hflush in the write-ahead-log with: if (now % 3 == 0) Thread .sleep(1); Because first if I just slept for 1ms it was already 2-3 times slower then normal sync time, I guess it's because it's very hard for the JVM to schedule such a small sleep time. The "now" variable is a System.currentTimeInMillis called just before and used for other metrics. So with this modification a single client takes as much time inserting as with normal sync and 4 clients take almost the same time on average to insert a value. With sync and 4 clients, it takes twice the time to insert a single value. It would tend to confirm that the synchronization between append and sync costs a lot for multi-threaded clients.
          Hide
          dhruba borthakur added a comment -

          > Do we then allow multiple concurrent sync calls, each waiting for a different seqno? Sounds like yes.

          This sounds right.

          Show
          dhruba borthakur added a comment - > Do we then allow multiple concurrent sync calls, each waiting for a different seqno? Sounds like yes. This sounds right.
          Hide
          Joydeep Sen Sarma added a comment -

          i think it's worth verifing that this would actually help hbase throughput (just a theory right now i think).

          we could set the hbase queue threshold to 1 and test with fake sync (that just returns immediately) and real sync and see what the difference is (is the sync time really holding back overall throughput (as intuition says it should be)).

          also - the proposal is to overlap actual network traffic and not just the buffer copies across app/dfs - right?

          Show
          Joydeep Sen Sarma added a comment - i think it's worth verifing that this would actually help hbase throughput (just a theory right now i think). we could set the hbase queue threshold to 1 and test with fake sync (that just returns immediately) and real sync and see what the difference is (is the sync time really holding back overall throughput (as intuition says it should be)). also - the proposal is to overlap actual network traffic and not just the buffer copies across app/dfs - right?
          Hide
          Hairong Kuang added a comment -

          oops, I meant HDFS-896.

          Show
          Hairong Kuang added a comment - oops, I meant HDFS-896 .
          Hide
          Hairong Kuang added a comment -

          +1. This sounds good. The only problem is that it is not easy to use because the application is forced to use multi-threads to write data. I guess what hbase needs is flush API1 as discussed in HADOOP-6313. I created an issue HDFS-895 to track the implementation.

          Show
          Hairong Kuang added a comment - +1. This sounds good. The only problem is that it is not easy to use because the application is forced to use multi-threads to write data. I guess what hbase needs is flush API1 as discussed in HADOOP-6313 . I created an issue HDFS-895 to track the implementation.
          Hide
          Todd Lipcon added a comment -

          Do we then allow multiple concurrent sync calls, each waiting for a different seqno? Sounds like yes.

          Show
          Todd Lipcon added a comment - Do we then allow multiple concurrent sync calls, each waiting for a different seqno? Sounds like yes.

            People

            • Assignee:
              Todd Lipcon
              Reporter:
              dhruba borthakur
            • Votes:
              1 Vote for this issue
              Watchers:
              15 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development