Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.16.0
    • Component/s: None
    • Labels:
      None

      Description

      The DFS client currently uses a staging file on local disk to cache all user-writes to a file. When the staging file accumulates 1 block worth of data, its contents are flushed to a HDFS datanode. These operations occur sequentially.

      A simple optimization of allowing the user to write to another staging file while simultaneously uploading the contents of the first staging file to HDFS will improve file-upload performance.

      1. clientDiskBuffer27.patch
        136 kB
        dhruba borthakur
      2. clientDiskBuffer27.patch
        136 kB
        dhruba borthakur
      3. clientDiskBuffer26.patch
        136 kB
        dhruba borthakur
      4. clientDiskBuffer25.patch
        135 kB
        dhruba borthakur
      5. clientDiskBuffer24.patch
        132 kB
        dhruba borthakur
      6. clientDiskBuffer24.patch
        116 kB
        dhruba borthakur
      7. clientDiskBuffer23.patch
        113 kB
        dhruba borthakur
      8. clientDiskBuffer23.patch
        113 kB
        dhruba borthakur
      9. clientDiskBuffer22.patch
        129 kB
        dhruba borthakur
      10. clientDiskBuffer21.patch
        129 kB
        dhruba borthakur
      11. clientDiskBuffer20.patch
        129 kB
        dhruba borthakur
      12. clientDiskBuffer19.patch
        126 kB
        dhruba borthakur
      13. clientDiskBuffer18.patch
        125 kB
        dhruba borthakur
      14. clientDiskBuffer17.patch
        126 kB
        dhruba borthakur
      15. clientDiskBuffer16.patch
        117 kB
        dhruba borthakur
      16. clientDiskBuffer15.patch
        111 kB
        dhruba borthakur
      17. clientDiskBuffer14.patch
        99 kB
        dhruba borthakur
      18. clientDiskBuffer12.patch
        97 kB
        dhruba borthakur
      19. clientDiskBuffer11.patch
        97 kB
        dhruba borthakur
      20. clientDiskBuffer10.patch
        97 kB
        dhruba borthakur
      21. clientDiskBuffer9.patch
        97 kB
        dhruba borthakur
      22. clientDiskBuffer8.patch
        96 kB
        dhruba borthakur
      23. clientDiskBuffer7.patch
        94 kB
        dhruba borthakur
      24. DataTransferProtocol.html
        18 kB
        dhruba borthakur
      25. DataTransferProtocol.doc
        44 kB
        dhruba borthakur
      26. clientDiskBuffer6.patch
        89 kB
        dhruba borthakur
      27. clientDiskBuffer2.patch
        49 kB
        dhruba borthakur
      28. clientDiskBuffer.patch
        45 kB
        dhruba borthakur

        Issue Links

          Activity

          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12373344/clientDiskBuffer27.patch
          against trunk revision r612933.

          @author +1. The patch does not contain any @author tags.

          patch -1. The patch command could not apply the patch.

          Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1624/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12373344/clientDiskBuffer27.patch against trunk revision r612933. @author +1. The patch does not contain any @author tags. patch -1. The patch command could not apply the patch. Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1624/console This message is automatically generated.
          Hide
          dhruba borthakur added a comment -

          I just committed this.

          Show
          dhruba borthakur added a comment - I just committed this.
          Hide
          dhruba borthakur added a comment -

          I am ignoring the two findbugs warnings because the code maintains strict locking heirarchy.

          Show
          dhruba borthakur added a comment - I am ignoring the two findbugs warnings because the code maintains strict locking heirarchy.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12373344/clientDiskBuffer27.patch
          against trunk revision r612674.

          @author +1. The patch does not contain any @author tags.

          javadoc +1. The javadoc tool did not generate any warning messages.

          javac +1. The applied patch does not generate any new compiler warnings.

          findbugs -1. The patch appears to introduce 3 new Findbugs warnings.

          core tests +1. The patch passed core unit tests.

          contrib tests -1. The patch failed contrib unit tests.

          Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1620/testReport/
          Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1620/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1620/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1620/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12373344/clientDiskBuffer27.patch against trunk revision r612674. @author +1. The patch does not contain any @author tags. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new compiler warnings. findbugs -1. The patch appears to introduce 3 new Findbugs warnings. core tests +1. The patch passed core unit tests. contrib tests -1. The patch failed contrib unit tests. Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1620/testReport/ Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1620/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1620/artifact/trunk/build/test/checkstyle-errors.html Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1620/console This message is automatically generated.
          Hide
          dhruba borthakur added a comment -

          merged patch with latest trunk.

          Show
          dhruba borthakur added a comment - merged patch with latest trunk.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12373299/clientDiskBuffer26.patch
          against trunk revision r612614.

          @author +1. The patch does not contain any @author tags.

          patch -1. The patch command could not apply the patch.

          Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1616/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12373299/clientDiskBuffer26.patch against trunk revision r612614. @author +1. The patch does not contain any @author tags. patch -1. The patch command could not apply the patch. Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1616/console This message is automatically generated.
          Hide
          dhruba borthakur added a comment -

          The TestSetReplicationIncreasing test case takes a long time to run, usually in the order of 10+ minutes. The reason being that the default timeout of a replication request timeout is 10 minutes. Change the configuration so that the default fro this test is 2 seconds. Makes this test case run a lot faster!

          Show
          dhruba borthakur added a comment - The TestSetReplicationIncreasing test case takes a long time to run, usually in the order of 10+ minutes. The reason being that the default timeout of a replication request timeout is 10 minutes. Change the configuration so that the default fro this test is 2 seconds. Makes this test case run a lot faster!
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12373225/clientDiskBuffer25.patch
          against trunk revision r612314.

          @author +1. The patch does not contain any @author tags.

          javadoc +1. The javadoc tool did not generate any warning messages.

          javac +1. The applied patch does not generate any new compiler warnings.

          findbugs -1. The patch appears to introduce 3 new Findbugs warnings.

          core tests -1. The patch failed core unit tests.

          contrib tests +1. The patch passed contrib unit tests.

          Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1609/testReport/
          Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1609/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1609/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1609/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12373225/clientDiskBuffer25.patch against trunk revision r612314. @author +1. The patch does not contain any @author tags. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new compiler warnings. findbugs -1. The patch appears to introduce 3 new Findbugs warnings. core tests -1. The patch failed core unit tests. contrib tests +1. The patch passed contrib unit tests. Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1609/testReport/ Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1609/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1609/artifact/trunk/build/test/checkstyle-errors.html Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1609/console This message is automatically generated.
          Hide
          dhruba borthakur added a comment -

          Fixed failure in TestCrcCorruption.

          Show
          dhruba borthakur added a comment - Fixed failure in TestCrcCorruption.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12373189/clientDiskBuffer24.patch
          against trunk revision r612200.

          @author +1. The patch does not contain any @author tags.

          javadoc +1. The javadoc tool did not generate any warning messages.

          javac +1. The applied patch does not generate any new compiler warnings.

          findbugs -1. The patch appears to introduce 2 new Findbugs warnings.

          core tests -1. The patch failed core unit tests.

          contrib tests +1. The patch passed contrib unit tests.

          Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1601/testReport/
          Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1601/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1601/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1601/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12373189/clientDiskBuffer24.patch against trunk revision r612200. @author +1. The patch does not contain any @author tags. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new compiler warnings. findbugs -1. The patch appears to introduce 2 new Findbugs warnings. core tests -1. The patch failed core unit tests. contrib tests +1. The patch passed contrib unit tests. Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1601/testReport/ Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1601/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1601/artifact/trunk/build/test/checkstyle-errors.html Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1601/console This message is automatically generated.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12373138/clientDiskBuffer23.patch
          against trunk revision r612025.

          @author +1. The patch does not contain any @author tags.

          javadoc +1. The javadoc tool did not generate any warning messages.

          javac +1. The applied patch does not generate any new compiler warnings.

          findbugs -1. The patch appears to introduce 2 new Findbugs warnings.

          core tests -1. The patch failed core unit tests.

          contrib tests -1. The patch failed contrib unit tests.

          Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1590/testReport/
          Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1590/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1590/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1590/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12373138/clientDiskBuffer23.patch against trunk revision r612025. @author +1. The patch does not contain any @author tags. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new compiler warnings. findbugs -1. The patch appears to introduce 2 new Findbugs warnings. core tests -1. The patch failed core unit tests. contrib tests -1. The patch failed contrib unit tests. Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1590/testReport/ Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1590/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1590/artifact/trunk/build/test/checkstyle-errors.html Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1590/console This message is automatically generated.
          Hide
          dhruba borthakur added a comment -

          For the unit tests to run faster, reduced the socket timeout period from a default of 1 minute to 5 seconds.

          Show
          dhruba borthakur added a comment - For the unit tests to run faster, reduced the socket timeout period from a default of 1 minute to 5 seconds.
          Hide
          dhruba borthakur added a comment -

          Reduced the number of datanodes thread in the unit test from 40 to 15, otherwise the unit tests take a long amount of time to complete.

          Show
          dhruba borthakur added a comment - Reduced the number of datanodes thread in the unit test from 40 to 15, otherwise the unit tests take a long amount of time to complete.
          Hide
          dhruba borthakur added a comment -

          The solaris platform exposed a race condition where an InterurptedException was interrupting the PacketHandler, thus causing it to not send an ack message for the last packet in a block.

          Show
          dhruba borthakur added a comment - The solaris platform exposed a race condition where an InterurptedException was interrupting the PacketHandler, thus causing it to not send an ack message for the last packet in a block.
          Hide
          dhruba borthakur added a comment -

          HadoopQA patch testing sees unit test failures that cannot be reproduced on linux machines. Re-submitting patch with additional debugging so that problem with unit test failing can be debugged. This in in lieu of having a direct account on the solaris machine on which Hadoop QA patch testing occurs.

          Show
          dhruba borthakur added a comment - HadoopQA patch testing sees unit test failures that cannot be reproduced on linux machines. Re-submitting patch with additional debugging so that problem with unit test failing can be debugged. This in in lieu of having a direct account on the solaris machine on which Hadoop QA patch testing occurs.
          Hide
          dhruba borthakur added a comment -

          Debugging switched one for catching problem that occurs repeatedly on hadoopQA patch testing environment.

          Show
          dhruba borthakur added a comment - Debugging switched one for catching problem that occurs repeatedly on hadoopQA patch testing environment.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12373081/clientDiskBuffer21.patch
          against trunk revision r611760.

          @author +1. The patch does not contain any @author tags.

          javadoc +1. The javadoc tool did not generate any warning messages.

          javac +1. The applied patch does not generate any new compiler warnings.

          findbugs -1. The patch appears to introduce 2 new Findbugs warnings.

          core tests -1. The patch failed core unit tests.

          contrib tests -1. The patch failed contrib unit tests.

          Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1583/testReport/
          Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1583/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1583/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1583/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12373081/clientDiskBuffer21.patch against trunk revision r611760. @author +1. The patch does not contain any @author tags. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new compiler warnings. findbugs -1. The patch appears to introduce 2 new Findbugs warnings. core tests -1. The patch failed core unit tests. contrib tests -1. The patch failed contrib unit tests. Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1583/testReport/ Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1583/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1583/artifact/trunk/build/test/checkstyle-errors.html Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1583/console This message is automatically generated.
          Hide
          dhruba borthakur added a comment -

          More findbugs issues and test failures.

          Show
          dhruba borthakur added a comment - More findbugs issues and test failures.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12373048/clientDiskBuffer20.patch
          against trunk revision r611537.

          @author +1. The patch does not contain any @author tags.

          javadoc +1. The javadoc tool did not generate any warning messages.

          javac +1. The applied patch does not generate any new compiler warnings.

          findbugs -1. The patch appears to introduce 2 new Findbugs warnings.

          core tests -1. The patch failed core unit tests.

          contrib tests -1. The patch failed contrib unit tests.

          Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1569/testReport/
          Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1569/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1569/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1569/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12373048/clientDiskBuffer20.patch against trunk revision r611537. @author +1. The patch does not contain any @author tags. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new compiler warnings. findbugs -1. The patch appears to introduce 2 new Findbugs warnings. core tests -1. The patch failed core unit tests. contrib tests -1. The patch failed contrib unit tests. Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1569/testReport/ Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1569/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1569/artifact/trunk/build/test/checkstyle-errors.html Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1569/console This message is automatically generated.
          Hide
          dhruba borthakur added a comment -

          Fix findbugs warnings. There are two findbugs warnings (about having two locks while invoking wait()) that are valid scenarios.

          Show
          dhruba borthakur added a comment - Fix findbugs warnings. There are two findbugs warnings (about having two locks while invoking wait()) that are valid scenarios.
          Hide
          dhruba borthakur added a comment -

          Cancel patch to fix findbugs warnings.

          Show
          dhruba borthakur added a comment - Cancel patch to fix findbugs warnings.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12372949/clientDiskBuffer19.patch
          against trunk revision r611385.

          @author +1. The patch does not contain any @author tags.

          javadoc +1. The javadoc tool did not generate any warning messages.

          javac +1. The applied patch does not generate any new compiler warnings.

          findbugs -1. The patch appears to introduce 10 new Findbugs warnings.

          core tests -1. The patch failed core unit tests.

          contrib tests -1. The patch failed contrib unit tests.

          Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1555/testReport/
          Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1555/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1555/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1555/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12372949/clientDiskBuffer19.patch against trunk revision r611385. @author +1. The patch does not contain any @author tags. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new compiler warnings. findbugs -1. The patch appears to introduce 10 new Findbugs warnings. core tests -1. The patch failed core unit tests. contrib tests -1. The patch failed contrib unit tests. Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1555/testReport/ Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1555/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1555/artifact/trunk/build/test/checkstyle-errors.html Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1555/console This message is automatically generated.
          Hide
          Mukund Madhugiri added a comment -

          Running on a 100 node cluster, with the patch clientDiskBuffer19.patch, the sort benchmark showed these results:

          100 nodes trunk trunk + patch
          randomWriter (hrs) 0.44 0.45
          sort (hrs) 1.03 1
          sortValidation (hrs) 0.39 0.3
          Show
          Mukund Madhugiri added a comment - Running on a 100 node cluster, with the patch clientDiskBuffer19.patch, the sort benchmark showed these results: 100 nodes trunk trunk + patch randomWriter (hrs) 0.44 0.45 sort (hrs) 1.03 1 sortValidation (hrs) 0.39 0.3
          Hide
          dhruba borthakur added a comment -

          A sort on a 500 node cluster detected a data corruption. The code in datanode had a race whereby a block confirmation of a block did not have the correct size of the block. This caused the namenode to think that the block is shorter in length.

          Show
          dhruba borthakur added a comment - A sort on a 500 node cluster detected a data corruption. The code in datanode had a race whereby a block confirmation of a block did not have the correct size of the block. This caused the namenode to think that the block is shorter in length.
          Hide
          dhruba borthakur added a comment -

          The earlier patch had LOG levels set to debug by default.

          Show
          dhruba borthakur added a comment - The earlier patch had LOG levels set to debug by default.
          Hide
          dhruba borthakur added a comment -

          Merged patch with latest trunk. This patch has additional debugging that might help in getting to the cause of the performance degradation seen in one earlier run.

          Show
          dhruba borthakur added a comment - Merged patch with latest trunk. This patch has additional debugging that might help in getting to the cause of the performance degradation seen in one earlier run.
          Hide
          dhruba borthakur added a comment -

          This patch fixes another performance degradation shown by the earlier patch. There was a race condition whereby an intermediate datanode in the pipeline was ignoring the response sent from the downstream datanode, always forwarding an "error" to the client.

          Show
          dhruba borthakur added a comment - This patch fixes another performance degradation shown by the earlier patch. There was a race condition whereby an intermediate datanode in the pipeline was ignoring the response sent from the downstream datanode, always forwarding an "error" to the client.
          Hide
          dhruba borthakur added a comment -

          Found a race condition that was causing the client to close the connection before the datanodes had a chance to process the end-of-packet. This caused the datanode to treat it as an error condition, thus causing the client to do error recovery and re-send the outstanding packets to the remaining good datanodes. This was causing performance regression.

          Show
          dhruba borthakur added a comment - Found a race condition that was causing the client to close the connection before the datanodes had a chance to process the end-of-packet. This caused the datanode to treat it as an error condition, thus causing the client to do error recovery and re-send the outstanding packets to the remaining good datanodes. This was causing performance regression.
          Hide
          dhruba borthakur added a comment -

          Thanks Mukund. The errors are causingt he numbers to go up. i will dig into the logs and code to find the cause of the errors.

          Show
          dhruba borthakur added a comment - Thanks Mukund. The errors are causingt he numbers to go up. i will dig into the logs and code to find the cause of the errors.
          Hide
          Mukund Madhugiri added a comment -

          I ran sort benchmark on 500 nodes and here is the data:

          trunk:

          • random writer: 0.405 hrs
          • sort: 1.508 hrs
          • sort validation: 0.333 hrs

          trunk + patch:

          • random writer: 0.534 hrs
          • sort: 1.808 hrs
          • sort validation: 0.408 hrs

          During the sort reduce phase, I observed some errors, but the sort eventually succeeded:

          • java.io.IOException: Could not get block locations. Aborting...
          • java.io.IOException: All datanodes are bad. Aborting...
          Show
          Mukund Madhugiri added a comment - I ran sort benchmark on 500 nodes and here is the data: trunk: random writer: 0.405 hrs sort: 1.508 hrs sort validation: 0.333 hrs trunk + patch: random writer: 0.534 hrs sort: 1.808 hrs sort validation: 0.408 hrs During the sort reduce phase, I observed some errors, but the sort eventually succeeded: java.io.IOException: Could not get block locations. Aborting... java.io.IOException: All datanodes are bad. Aborting...
          Hide
          dhruba borthakur added a comment -

          Merged with latest trunk. Also fixed a bug where an InterruptedException was being consumed silently, thus leading to long delays in timing-outs.

          Show
          dhruba borthakur added a comment - Merged with latest trunk. Also fixed a bug where an InterruptedException was being consumed silently, thus leading to long delays in timing-outs.
          Hide
          dhruba borthakur added a comment -

          Setting the TCP buffer size to 64K (instead of the default of 8K) and setting tcpNoDelay() on the response socket improves performance by about 5%.

          Show
          dhruba borthakur added a comment - Setting the TCP buffer size to 64K (instead of the default of 8K) and setting tcpNoDelay() on the response socket improves performance by about 5%.
          Hide
          Konstantin Shvachko added a comment -

          I think this patch has been tested quite thoroughly, and I don't see any algorithmic flaws in it.
          The logic is fairly complicated though, so imo

          1. we need better documentation either in JavaDoc or at least in Jira.
          2. it would be good if you could extract common actions for the client and the data-node into
            separate classes, not inner ones.

          =========== DFSClient.java

          • DFSClient: 4 unused variables, members.
          • DFSOutputStream.lb should be local variable.
          • processDatanodeError() and DFSOutputStream.close() have common code.
          • BlockReader.readChunk()
            07/12/04 18:36:22 INFO fs.FSInputChecker: DFSClient readChunk got seqno 14 offsetInBlock 7168
            

            Should be DEBUG.

          • More comments: What is e.g. dataQueue, ackQueue, bytesCurBlock?
          • Some new members in DFSOutputStream can be calculated from the other.
            No need to store them all. See e.g.
                private int packetSize = 0;
                private int chunksPerPacket = 0;
                private int chunksPerBlock = 0;
                private int chunkSize = 0;
            
          • In the line below "8" should be defined as a constant. Otherwise, the meaning of that is not clear.
                  chunkSize = bytesPerChecksum + 8; // user data + checksum
            
          • currentPacket should be a local variable of writeChunk()
          • The 4 in the code snippet below looks misterious:
                  if (len + cklen + 4 > chunkSize) {
            
          • why start ResponseProcessor in processDatanodeError()
          • some methods should be moved into new inner classes, like
            nextBlockOutputStream() should be a part of DataStreamer
          • Packet should be factored out to a separate class (named probably DataPacket).
            It should have serialization/deserialization methods for packet header, which should
            be reused in DFSClient and DataNodes for consistency in data transfer.
            It also should have methods readPacker() and writePacket()

          =========== DataNode.java

          • import org.apache.hadoop.io.Text; is redundant.
          • My Eclipse shows 5 variables that are "never read".
          • Rather than using "4" on several occasions a constant should be defined
            SIZE_OF_INTEGER = Integer.SIZE / Byte.SIZE;
            

            and used whenever required.

          • lastDataNodeRun() should not be public

          =========== FSDataset.java

          • writeToBlock(): These are two searches in a map instead of one.
                  if (ongoingCreates.containsKey(b)) {
                    ActiveFile activeFile = ongoingCreates.get(b);
            
          • unfinalizeBlock() I kinda find the name funny.

          =========== General

          • Convert comments like // .......... to JavaDoc /** ... */ style comments
            when used as method or class headers even if they are private.
          • Formatting. Tabs should be replaced by 2 spaces. Eg: ResponseProcessor.run(), DataStreamer.run()
          • Formatting. Long lines.
          Show
          Konstantin Shvachko added a comment - I think this patch has been tested quite thoroughly, and I don't see any algorithmic flaws in it. The logic is fairly complicated though, so imo we need better documentation either in JavaDoc or at least in Jira. it would be good if you could extract common actions for the client and the data-node into separate classes, not inner ones. =========== DFSClient.java DFSClient: 4 unused variables, members. DFSOutputStream.lb should be local variable. processDatanodeError() and DFSOutputStream.close() have common code. BlockReader.readChunk() 07/12/04 18:36:22 INFO fs.FSInputChecker: DFSClient readChunk got seqno 14 offsetInBlock 7168 Should be DEBUG. More comments: What is e.g. dataQueue, ackQueue, bytesCurBlock? Some new members in DFSOutputStream can be calculated from the other. No need to store them all. See e.g. private int packetSize = 0; private int chunksPerPacket = 0; private int chunksPerBlock = 0; private int chunkSize = 0; In the line below "8" should be defined as a constant. Otherwise, the meaning of that is not clear. chunkSize = bytesPerChecksum + 8; // user data + checksum currentPacket should be a local variable of writeChunk() The 4 in the code snippet below looks misterious: if (len + cklen + 4 > chunkSize) { why start ResponseProcessor in processDatanodeError() some methods should be moved into new inner classes, like nextBlockOutputStream() should be a part of DataStreamer Packet should be factored out to a separate class (named probably DataPacket). It should have serialization/deserialization methods for packet header, which should be reused in DFSClient and DataNodes for consistency in data transfer. It also should have methods readPacker() and writePacket() =========== DataNode.java import org.apache.hadoop.io.Text; is redundant. My Eclipse shows 5 variables that are "never read". Rather than using "4" on several occasions a constant should be defined SIZE_OF_INTEGER = Integer .SIZE / Byte .SIZE; and used whenever required. lastDataNodeRun() should not be public =========== FSDataset.java writeToBlock(): These are two searches in a map instead of one. if (ongoingCreates.containsKey(b)) { ActiveFile activeFile = ongoingCreates.get(b); unfinalizeBlock() I kinda find the name funny. =========== General Convert comments like // .......... to JavaDoc /** ... */ style comments when used as method or class headers even if they are private. Formatting. Tabs should be replaced by 2 spaces. Eg: ResponseProcessor.run(), DataStreamer.run() Formatting. Long lines.
          Hide
          dhruba borthakur added a comment -

          Merged with latest trunk.

          Show
          dhruba borthakur added a comment - Merged with latest trunk.
          Hide
          dhruba borthakur added a comment -

          Fixed a bug that was causing the client to hang if all datanodes in the pipeline reported an error. This situation was triggered while testing this patch on a 500 node cluster.

          Show
          dhruba borthakur added a comment - Fixed a bug that was causing the client to hang if all datanodes in the pipeline reported an error. This situation was triggered while testing this patch on a 500 node cluster.
          Hide
          dhruba borthakur added a comment -

          Make patch compile with JDK 1.5

          Show
          dhruba borthakur added a comment - Make patch compile with JDK 1.5
          Hide
          Konstantin Shvachko added a comment -

          Since you have just encountered that.
          The same problem will potentially be in the following 3 methods

          • nextBlockOutputStream()
          • locateFollowingBlock()
          • DFSOutputStream.close()

          where the client sleeps under a lock. In general a thread should wait() instead of sleep() under a lock.

          Show
          Konstantin Shvachko added a comment - Since you have just encountered that. The same problem will potentially be in the following 3 methods nextBlockOutputStream() locateFollowingBlock() DFSOutputStream.close() where the client sleeps under a lock. In general a thread should wait() instead of sleep() under a lock.
          Hide
          dhruba borthakur added a comment -

          Fixed two bugs that were exposed while running random writer on a 100 node cluster.

          1. The code was such that it was waiting for the ResponseThread to exit while holding the lock on dataQueue. This caused a deadlock.

          2. The DFSClient was sending the packet to the first datanode before it inserted the packet into the ackQueue. Now, if the response from the datanode arrives before the DFSClient could enqueue the packet into the ackQueue it triggered an error. This situation is now avoided because the DFSClient first inserts the packet into the ackQueue before sending the packet to the datanode.

          Show
          dhruba borthakur added a comment - Fixed two bugs that were exposed while running random writer on a 100 node cluster. 1. The code was such that it was waiting for the ResponseThread to exit while holding the lock on dataQueue. This caused a deadlock. 2. The DFSClient was sending the packet to the first datanode before it inserted the packet into the ackQueue. Now, if the response from the datanode arrives before the DFSClient could enqueue the packet into the ackQueue it triggered an error. This situation is now avoided because the DFSClient first inserts the packet into the ackQueue before sending the packet to the datanode.
          Hide
          dhruba borthakur added a comment -

          Merged patch with latest trunk.

          Show
          dhruba borthakur added a comment - Merged patch with latest trunk.
          Hide
          Doug Cutting added a comment -

          This protocol document is great to have! Can we get it converted into forrest-compatible XML and included in a reference section of the documentation when this patch is committed?

          Show
          Doug Cutting added a comment - This protocol document is great to have! Can we get it converted into forrest-compatible XML and included in a reference section of the documentation when this patch is committed?
          Hide
          dhruba borthakur added a comment -

          The Data transfer Protocll document in html format.

          Show
          dhruba borthakur added a comment - The Data transfer Protocll document in html format.
          Hide
          dhruba borthakur added a comment -

          A document that describes the streaming protocol used to transfer data among clients and datanodes.

          Show
          dhruba borthakur added a comment - A document that describes the streaming protocol used to transfer data among clients and datanodes.
          Hide
          dhruba borthakur added a comment -

          This patch removes the client side disk buffer.

          1. FSConstants.java : Bumped up DATA_TRANSFER_VERSION.
          2. Daemon.java: Added a ThreadGroup to the Daemon class. All worker threads that process data transfers belong to this group. The shutdown of a datnode waits for the entire threadgroup to exit. Prior to this change, a datanode shutdown did not wait for the data transfer threads to exit.
          3. FSNamesystem.java: Allows a zero size file to have no blocks associated with it.
          4. DataChecksum.java: A utility method to return the size of a checksum header.
          5. FSDataset.java: The ongoingCreates data structure remembers the thread that is currently writing to a block. The writeToBlock() method (when the recovery flag is set) terminates any existing threads that might have been writing to a block before allowing a new thread to write to the same block.
          6. FSDataOutputStream.java: The unit test needed to extract the pipeline associated with a block. This is facilitated by exposing a new public API called getWrappedStream() that returns the underlying DFSOutputStream object.
          7. MiniDFSCluster.java: Allows stopping a particular datanode.
          8. DFSClient.java/DataNode.java: User data is transferred in the form of packets. Each Packet requires an ack from all datanodes. The DFSClient drives the entire recovery strategy. A keepalive is sent every READ_TIMEOUT/2 period on the response socket channel. Each packet is 64K in size and the client has a sliding window of 5MB per stream.
          9. TestDatanodeDeath.java: A unit test to trigger datanode deaths and DFSClient recovery.

          Show
          dhruba borthakur added a comment - This patch removes the client side disk buffer. 1. FSConstants.java : Bumped up DATA_TRANSFER_VERSION. 2. Daemon.java: Added a ThreadGroup to the Daemon class. All worker threads that process data transfers belong to this group. The shutdown of a datnode waits for the entire threadgroup to exit. Prior to this change, a datanode shutdown did not wait for the data transfer threads to exit. 3. FSNamesystem.java: Allows a zero size file to have no blocks associated with it. 4. DataChecksum.java: A utility method to return the size of a checksum header. 5. FSDataset.java: The ongoingCreates data structure remembers the thread that is currently writing to a block. The writeToBlock() method (when the recovery flag is set) terminates any existing threads that might have been writing to a block before allowing a new thread to write to the same block. 6. FSDataOutputStream.java: The unit test needed to extract the pipeline associated with a block. This is facilitated by exposing a new public API called getWrappedStream() that returns the underlying DFSOutputStream object. 7. MiniDFSCluster.java: Allows stopping a particular datanode. 8. DFSClient.java/DataNode.java: User data is transferred in the form of packets. Each Packet requires an ack from all datanodes. The DFSClient drives the entire recovery strategy. A keepalive is sent every READ_TIMEOUT/2 period on the response socket channel. Each packet is 64K in size and the client has a sliding window of 5MB per stream. 9. TestDatanodeDeath.java: A unit test to trigger datanode deaths and DFSClient recovery.
          Hide
          Raghu Angadi added a comment -

          > 2. Each datanode sends a heartbeat message to the upstream datanode once every half-timeout-period.
          It might be better to call this 'KeepAlive' since it is per connection and avoids confusion with DataNode heartbeat.

          Show
          Raghu Angadi added a comment - > 2. Each datanode sends a heartbeat message to the upstream datanode once every half-timeout-period. It might be better to call this 'KeepAlive' since it is per connection and avoids confusion with DataNode heartbeat.
          Hide
          dhruba borthakur added a comment -

          In the current trunk, the first datanode in the pipeline sets a timeout of 2 minutes. The second datanode sets a timeout of 1 minute, and so on. If a datanode does not receive a response from a downstream datanode within this timeout period, it declared the downsteam data as dead.

          In this patch for removing the client-side-disk buffer, the connection between datanodes in the pipeline could remain open for extended periods of time, especially for clients that are producing output slowly. I propose that we change the timeouts to behave as follows:

          1. Each datanode in the pipeline has the same timeout of 1 minute. If a datanode does not receive a response from a downstream datanode in 1 minute, it declares the downstream datanode as dead.
          2. Each datanode sends a heartbeat message to the upstream datanode once every half-timeout-period.

          Show
          dhruba borthakur added a comment - In the current trunk, the first datanode in the pipeline sets a timeout of 2 minutes. The second datanode sets a timeout of 1 minute, and so on. If a datanode does not receive a response from a downstream datanode within this timeout period, it declared the downsteam data as dead. In this patch for removing the client-side-disk buffer, the connection between datanodes in the pipeline could remain open for extended periods of time, especially for clients that are producing output slowly. I propose that we change the timeouts to behave as follows: 1. Each datanode in the pipeline has the same timeout of 1 minute. If a datanode does not receive a response from a downstream datanode in 1 minute, it declares the downstream datanode as dead. 2. Each datanode sends a heartbeat message to the upstream datanode once every half-timeout-period.
          Hide
          dhruba borthakur added a comment -

          This is a very very preliminary patch that packetize writes from clients. It does not do any error recovery at all.

          We discussed a proposal where datanodes do local recovery. If a datanode fails, the datanode immediately preceeding it will recreate the pipeline by ignoring the one that failed and connecting directly to the datanode that followed the one that failed. This approach has the disadvantage that in the case of multiple failures, two upstream datanodes might be in recovery and both of then might try to resend the block to a downstream datanode simultaneously. This might be a difficult case to handle.

          Also, the earlier proposal generated an exception to the client if the primary datanode fails. This might be a commonly occuring case. If we would want to avoid this problem, then the client has to do Recovery (over and above any datanodes doing local recovery). In this case, maybe it is better to drive the entire recovery from a single point : the client.

          The cascading timeouts issue has to be handled somehow. Your proposal of setting different timeouts for datanodes in the pipeline will work but it might be a little tricky to implement and debug. Another approach would be for each datanode to expose a new "ping" RPC. The Client, when it has to recover, "pings" each Datanode and determines which of them are not responding. This seems to work, isn't it?

          Show
          dhruba borthakur added a comment - This is a very very preliminary patch that packetize writes from clients. It does not do any error recovery at all. We discussed a proposal where datanodes do local recovery. If a datanode fails, the datanode immediately preceeding it will recreate the pipeline by ignoring the one that failed and connecting directly to the datanode that followed the one that failed. This approach has the disadvantage that in the case of multiple failures, two upstream datanodes might be in recovery and both of then might try to resend the block to a downstream datanode simultaneously. This might be a difficult case to handle. Also, the earlier proposal generated an exception to the client if the primary datanode fails. This might be a commonly occuring case. If we would want to avoid this problem, then the client has to do Recovery (over and above any datanodes doing local recovery). In this case, maybe it is better to drive the entire recovery from a single point : the client. The cascading timeouts issue has to be handled somehow. Your proposal of setting different timeouts for datanodes in the pipeline will work but it might be a little tricky to implement and debug. Another approach would be for each datanode to expose a new "ping" RPC. The Client, when it has to recover, "pings" each Datanode and determines which of them are not responding. This seems to work, isn't it?
          Hide
          Doug Cutting added a comment -

          This still appears to have the cascading timeout issue, no? Each stage in the pipeline must have a smaller timeout than the prior stage or else the whole pipeline will fail when any node fails. In particular, the client must use a much larger timeout, since it must permit the primary to potentially replay the entire block downstream. Perhaps there can be multiple kinds of acks, some which just indicate that the primary is still alive and others that indicate that replication is complete? (Acks might include the current level of replication.) That might help distinguish the cases where the primary has actually gone down from those where it is still doing productive work. Then one timeout could be used for communications, and a substantially longer one for awaiting replication.

          I also wonder whether, instead of having so many threads, we might implement this with async i/o. Much of the processing seems simple enough that maintaining a state object for each file being written and using a single thread that selects on sockets and then updates the state might be more efficient. Perhaps it will be simpler to write these with threads, then convert them to async?

          We discussed offline last week a different approach from what you've described here. In that, acks would only signal that the immediately downstream node had written the data, not all downstream nodes. Only at block end or flush would it check that sufficient replicas exist, with a different command. Why have you abandoned this plan?

          An intermediate approach might be to use buffer pools on each datanode in the pipeline. Each would write the buffer locally and queue it to be written downstream. The buffer would only be returned to the pool when both writes complete. A datanode could block when no buffers are available. That might improve throughput.

          Show
          Doug Cutting added a comment - This still appears to have the cascading timeout issue, no? Each stage in the pipeline must have a smaller timeout than the prior stage or else the whole pipeline will fail when any node fails. In particular, the client must use a much larger timeout, since it must permit the primary to potentially replay the entire block downstream. Perhaps there can be multiple kinds of acks, some which just indicate that the primary is still alive and others that indicate that replication is complete? (Acks might include the current level of replication.) That might help distinguish the cases where the primary has actually gone down from those where it is still doing productive work. Then one timeout could be used for communications, and a substantially longer one for awaiting replication. I also wonder whether, instead of having so many threads, we might implement this with async i/o. Much of the processing seems simple enough that maintaining a state object for each file being written and using a single thread that selects on sockets and then updates the state might be more efficient. Perhaps it will be simpler to write these with threads, then convert them to async? We discussed offline last week a different approach from what you've described here. In that, acks would only signal that the immediately downstream node had written the data, not all downstream nodes. Only at block end or flush would it check that sufficient replicas exist, with a different command. Why have you abandoned this plan? An intermediate approach might be to use buffer pools on each datanode in the pipeline. Each would write the buffer locally and queue it to be written downstream. The buffer would only be returned to the pool when both writes complete. A datanode could block when no buffers are available. That might improve throughput.
          Hide
          dhruba borthakur added a comment -

          I agree that that timeout issue does not have a very elegant solution. Here is a new proposal.

          The Client
          --------------
          1. The Client uses a small pool of memory buffers per dfs-output stream. Say, 10 buffers of size 64K each.
          2. A write to the output stream actually copies the user data into one of the buffers, if available. Otherwise the user-write blocks.
          3. A separate thread (one per output stream), sends buffers that are full. Each buffer has metadata that contains a sequence number (locally generated on the client) , the length of the buffer and its offset in this block.
          4. Another thread(one per output stream) process incoming responses. The incoming response has the sequence number of the buffer that the datanode had processed. The client removes that buffer from its queue.

          The Primary Datanode
          ------------------------------
          The primary datanode has two threads per stream. The first thread processes incoming packets from the client, writes them to the downstream datanode and writes them to local disk. The second thread processes responses from downstream datanodes and forwards them back to the client.

          This means that the client gets back an ack only when the packet is persisted on all datanodes. In the future this can be changed so that the client gets an ack when the data is persisted in dfs.replication.min number of datanodes.

          In case the primary datanode encounters an exception while writing to the downstream datanode, it declares the block as bad. It removes the immediate downstream datanode from the pipeline. It makes an RPC to the namenode to abandon the current blockId and*replace* the block id with a new one. It then establishes a new pipeline using the new blockid using the remaining datanodes. It then copies all the data from the local temporary block file to the downstream datanodes using the new blockId.

          The Secondary Datanodes
          ------------------------------------
          The Secondary datanode has two threads per stream. The first thread processes incoming packets from the upstream datanode, writes them to the downstream datanode and writes them to local disk. The second thread processes responses from downstream datanodes and forwards them back to the upstream datanode.

          Each secondary datanode sends its response as well forwards the response of all downstream datanodes.

          Show
          dhruba borthakur added a comment - I agree that that timeout issue does not have a very elegant solution. Here is a new proposal. The Client -------------- 1. The Client uses a small pool of memory buffers per dfs-output stream. Say, 10 buffers of size 64K each. 2. A write to the output stream actually copies the user data into one of the buffers, if available. Otherwise the user-write blocks. 3. A separate thread (one per output stream), sends buffers that are full. Each buffer has metadata that contains a sequence number (locally generated on the client) , the length of the buffer and its offset in this block. 4. Another thread(one per output stream) process incoming responses. The incoming response has the sequence number of the buffer that the datanode had processed. The client removes that buffer from its queue. The Primary Datanode ------------------------------ The primary datanode has two threads per stream. The first thread processes incoming packets from the client, writes them to the downstream datanode and writes them to local disk. The second thread processes responses from downstream datanodes and forwards them back to the client. This means that the client gets back an ack only when the packet is persisted on all datanodes. In the future this can be changed so that the client gets an ack when the data is persisted in dfs.replication.min number of datanodes. In case the primary datanode encounters an exception while writing to the downstream datanode, it declares the block as bad. It removes the immediate downstream datanode from the pipeline. It makes an RPC to the namenode to abandon the current blockId and*replace* the block id with a new one. It then establishes a new pipeline using the new blockid using the remaining datanodes. It then copies all the data from the local temporary block file to the downstream datanodes using the new blockId. The Secondary Datanodes ------------------------------------ The Secondary datanode has two threads per stream. The first thread processes incoming packets from the upstream datanode, writes them to the downstream datanode and writes them to local disk. The second thread processes responses from downstream datanodes and forwards them back to the upstream datanode. Each secondary datanode sends its response as well forwards the response of all downstream datanodes.
          Hide
          dhruba borthakur added a comment -

          This design depends on the fact that a the client can detect the datanode that encountered an error in the pipeline. This patch will fix the issue described in HADOOP-1927.

          Show
          dhruba borthakur added a comment - This design depends on the fact that a the client can detect the datanode that encountered an error in the pipeline. This patch will fix the issue described in HADOOP-1927 .
          Hide
          Doug Cutting added a comment -

          > The client gets an exception if the primary datanode fails.

          Why can't it simply replace the primary with one of the secondary datanodes and proceed?

          > If a secondary datanode fails, the primary informs the client about this event.

          Since a secondary will typically fail by timing out, the timeout used between the client and the primary must be longer than that used between the primary and secondary, so that the client waits long enough to hear about a failed secondary. And the timeout used between the application and the client must be longer yet. Right? Perhaps we should make all these timeouts proportional to a single configuration parameter, the application timeout?

          If we wish to ensure that blocks are sufficiently replicated, then we'll block on file close, right?

          Overall, this sounds like an approach worth trying.

          Show
          Doug Cutting added a comment - > The client gets an exception if the primary datanode fails. Why can't it simply replace the primary with one of the secondary datanodes and proceed? > If a secondary datanode fails, the primary informs the client about this event. Since a secondary will typically fail by timing out, the timeout used between the client and the primary must be longer than that used between the primary and secondary, so that the client waits long enough to hear about a failed secondary. And the timeout used between the application and the client must be longer yet. Right? Perhaps we should make all these timeouts proportional to a single configuration parameter, the application timeout? If we wish to ensure that blocks are sufficiently replicated, then we'll block on file close, right? Overall, this sounds like an approach worth trying.
          Hide
          dhruba borthakur added a comment -

          I have the following proposal in mind:

          1. The Client uses a small pool of memory buffers per dfs-output stream. Say, 10 buffers of size 64K each.
          2. A write to the output stream actually copies the user data into one of the buffers, if available. Otherwise the user-write blocks.
          3. A separate thread (one per output stream), sends buffers that are full. Each buffer has metadata that contains a sequence number (locally generated on the client) , the length of the buffer and its offset in this block.
          4. Another thread(one per output stream) process incoming responses. The incoming response has the sequence number of the buffer that the datanode had processed. The client removes that buffer from its queue.
          5. The client gets an exception if the primary datanode fails. If a secondary datanode fails, the primary informs the client about this event.
          6. In any datanodes fail, the client removes it from the pipeline and resends all pending buffers to all known good datanodes.
          7. A target datanode remembers the last sequencenumber that it has previously processed. It forwards the buffer to the next datanode in the pipeline. If the datanode receives a buffer that it has not processed earlier, it writes it to local disk. When the response arrives, it forwards the response back to the client.

          Show
          dhruba borthakur added a comment - I have the following proposal in mind: 1. The Client uses a small pool of memory buffers per dfs-output stream. Say, 10 buffers of size 64K each. 2. A write to the output stream actually copies the user data into one of the buffers, if available. Otherwise the user-write blocks. 3. A separate thread (one per output stream), sends buffers that are full. Each buffer has metadata that contains a sequence number (locally generated on the client) , the length of the buffer and its offset in this block. 4. Another thread(one per output stream) process incoming responses. The incoming response has the sequence number of the buffer that the datanode had processed. The client removes that buffer from its queue. 5. The client gets an exception if the primary datanode fails. If a secondary datanode fails, the primary informs the client about this event. 6. In any datanodes fail, the client removes it from the pipeline and resends all pending buffers to all known good datanodes. 7. A target datanode remembers the last sequencenumber that it has previously processed. It forwards the buffer to the next datanode in the pipeline. If the datanode receives a buffer that it has not processed earlier, it writes it to local disk. When the response arrives, it forwards the response back to the client.
          Hide
          Doug Cutting added a comment -

          > Another option would be to say that the application gets an error if the Primary datanode fails. Do you think that this is acceptable?

          Perhaps, if it only happens rarely. If, e.g., sorts generally complete on 900 nodes with no such failures, then this is probably acceptable. If the primary datanode is localhost, and if secondary failures are survivable, then this may work well enough.

          Otherwise, how do we recover when a datanode in the pipeline becomes unreachable? Will we use per-buffer acks? The primary datanode won't ack a buffer until all datanodes in the pipeline have it? Then if one datanode fails, we could route around it, initialize its copy of the block from one of the survivors, and continue. The acking will effectively add flow-control, which could be a feature, or could slow things. Datanodes may receive the same buffer twice, so buffers will need revision numbers or somesuch.

          Show
          Doug Cutting added a comment - > Another option would be to say that the application gets an error if the Primary datanode fails. Do you think that this is acceptable? Perhaps, if it only happens rarely. If, e.g., sorts generally complete on 900 nodes with no such failures, then this is probably acceptable. If the primary datanode is localhost, and if secondary failures are survivable, then this may work well enough. Otherwise, how do we recover when a datanode in the pipeline becomes unreachable? Will we use per-buffer acks? The primary datanode won't ack a buffer until all datanodes in the pipeline have it? Then if one datanode fails, we could route around it, initialize its copy of the block from one of the survivors, and continue. The acking will effectively add flow-control, which could be a feature, or could slow things. Datanodes may receive the same buffer twice, so buffers will need revision numbers or somesuch.
          Hide
          dhruba borthakur added a comment -

          The local disk cache implementation is similar to creating four replicas of the block and then deleting the excess replica when the block is done. This reduces overall cluster throughput and I would like to analyze ways of getting rid of it.

          I agree that I should change the name-description of this JIRA. Will do.

          Show
          dhruba borthakur added a comment - The local disk cache implementation is similar to creating four replicas of the block and then deleting the excess replica when the block is done. This reduces overall cluster throughput and I would like to analyze ways of getting rid of it. I agree that I should change the name-description of this JIRA. Will do.
          Hide
          Raghu Angadi added a comment - - edited

          The jira description only talks about parallel write to datanodes. It does not require removal of the temp file on client.

          How about just storing the block at the client like we do now and replay the data if the there is an error? It still allows parallel write to the client. This also does not need any changes/improvements to datanode protocol. Yes, removing the temp file would be better, but it is not worse than current implementation.

          Show
          Raghu Angadi added a comment - - edited The jira description only talks about parallel write to datanodes. It does not require removal of the temp file on client. How about just storing the block at the client like we do now and replay the data if the there is an error? It still allows parallel write to the client. This also does not need any changes/improvements to datanode protocol. Yes, removing the temp file would be better, but it is not worse than current implementation.
          Hide
          dhruba borthakur added a comment -

          If the primary datanode fails the client can still replay the last-flushed-data buffer to the remaining datanodes. The client has to specify the offset in the block where this buffer contents has to be written. The datanode, given this offset-in-block, can determine whether to do the write or whether the write was already done. The pre-requisite is that a client holds on to a buffer until the write is complete on all known good datanodes.

          Another option would be to say that the application gets an error if the Primary datanode fails. Do you think that this is acceptable?

          I think HADOOP-1927 says that if a non-primary datanode dies, the client should detect it and possibly take appropriate action. Currently the client has no way of knowing whether any secondary datanodes have died.

          Show
          dhruba borthakur added a comment - If the primary datanode fails the client can still replay the last-flushed-data buffer to the remaining datanodes. The client has to specify the offset in the block where this buffer contents has to be written. The datanode, given this offset-in-block, can determine whether to do the write or whether the write was already done. The pre-requisite is that a client holds on to a buffer until the write is complete on all known good datanodes. Another option would be to say that the application gets an error if the Primary datanode fails. Do you think that this is acceptable? I think HADOOP-1927 says that if a non-primary datanode dies, the client should detect it and possibly take appropriate action. Currently the client has no way of knowing whether any secondary datanodes have died.
          Hide
          Doug Cutting added a comment -

          If a datanode fails to write a buffer to its disk, it is reported back to the client. The client removes this datanode from the pipeline and continues to write to the remaining two datanodes. [ ... ] When the file is closed, the under-replicated blocks will be replicated by the namenode.

          I think the more typical failure mode will be a timeout. I'm also still not sure of the answer to my question: if the first datanode in the pipeline times out, does the write fail, throwing an exception to the client? Or does the client route around the first datanode in the pipeline and continue until all datanodes in the pipeline time out? If so, how can it be sure that the other datanodes have received their copies of prior chunks from the first datanode in the pipeline?

          Also, HADOOP-1927 states that we should fail as soon as any element in the pipeline fails. Do you agree? Currently this would be invisible to clients, since the entire block can be replayed to a new pipeline. But, without a local file, this would force us to fail the write when any element of the pipeline fails. Thoughts?

          Show
          Doug Cutting added a comment - If a datanode fails to write a buffer to its disk, it is reported back to the client. The client removes this datanode from the pipeline and continues to write to the remaining two datanodes. [ ... ] When the file is closed, the under-replicated blocks will be replicated by the namenode. I think the more typical failure mode will be a timeout. I'm also still not sure of the answer to my question: if the first datanode in the pipeline times out, does the write fail, throwing an exception to the client? Or does the client route around the first datanode in the pipeline and continue until all datanodes in the pipeline time out? If so, how can it be sure that the other datanodes have received their copies of prior chunks from the first datanode in the pipeline? Also, HADOOP-1927 states that we should fail as soon as any element in the pipeline fails. Do you agree? Currently this would be invisible to clients, since the entire block can be replayed to a new pipeline. But, without a local file, this would force us to fail the write when any element of the pipeline fails. Thoughts?
          Hide
          dhruba borthakur added a comment -

          Thanks Doug for your comments.

          1. My thinking is as follows: the client has a bunch of small buffers. Say 2 buffers each of size 16K. When the first buffer is full, it writes that buffer to the first datanode in the pipeline. The client meanwhile can continue to fill up the remaining buffer(s). The first datanode, on receipt of this buffer, sends it to the next datanode in the pipeline and also writes it to its local disk.

          2. If a datanode fails to write a buffer to its disk, it is reported back to the client. The client removes this datanode from the pipeline and continues to write to the remaining two datanodes. The file in the bad datanode remains in the "tmp" directory.

          3. When the file is closed, the under-replicated blocks will be replicated by the namenode.

          Show
          dhruba borthakur added a comment - Thanks Doug for your comments. 1. My thinking is as follows: the client has a bunch of small buffers. Say 2 buffers each of size 16K. When the first buffer is full, it writes that buffer to the first datanode in the pipeline. The client meanwhile can continue to fill up the remaining buffer(s). The first datanode, on receipt of this buffer, sends it to the next datanode in the pipeline and also writes it to its local disk. 2. If a datanode fails to write a buffer to its disk, it is reported back to the client. The client removes this datanode from the pipeline and continues to write to the remaining two datanodes. The file in the bad datanode remains in the "tmp" directory. 3. When the file is closed, the under-replicated blocks will be replicated by the namenode.
          Hide
          Doug Cutting added a comment -

          > The client will stream data to the datanodes directly [ ... ]

          Some history to be aware of. Long ago writes were tee'd to datanodes directly, and the local file was only used to replay things. Switching it so that writes were always buffered to a local file had two advantages: it radically simplified the code (the tee multiplied the number of failure modes) and it improved performance & reliability. Each datanode had far fewer active connections, since blocks were written in a burst rather than as a trickle.

          How will you handle datanode failures? Since you have no local file to replay, won't those always cause an exception in the client? That will cause tasks to fail, which might be acceptable, now that things are overall more reliable, but, at the time I looked at this (again, long ago) datanode timeouts were frequent enough that this would cause job failure.

          Show
          Doug Cutting added a comment - > The client will stream data to the datanodes directly [ ... ] Some history to be aware of. Long ago writes were tee'd to datanodes directly, and the local file was only used to replay things. Switching it so that writes were always buffered to a local file had two advantages: it radically simplified the code (the tee multiplied the number of failure modes) and it improved performance & reliability. Each datanode had far fewer active connections, since blocks were written in a burst rather than as a trickle. How will you handle datanode failures? Since you have no local file to replay, won't those always cause an exception in the client? That will cause tasks to fail, which might be acceptable, now that things are overall more reliable, but, at the time I looked at this (again, long ago) datanode timeouts were frequent enough that this would cause job failure.
          Hide
          dhruba borthakur added a comment -

          I plan on removing the staging file altogether. The client will stream data to the datanodes directly, possibly in chunks of 64K memory buffers. Detail design to follow.

          Show
          dhruba borthakur added a comment - I plan on removing the staging file altogether. The client will stream data to the datanodes directly, possibly in chunks of 64K memory buffers. Detail design to follow.

            People

            • Assignee:
              dhruba borthakur
              Reporter:
              dhruba borthakur
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development