Uploaded image for project: 'Thrift'
  1. Thrift
  2. THRIFT-4187

Dart -> Rust Framed cross tests fail

Details

    • Bug
    • Status: Closed
    • Minor
    • Resolution: Fixed
    • None
    • 0.12.0
    • Rust - Library
    • None

    Description

      For some reason the Dart (client) -> Rust (server) framed-transport cross tests fail. Initial investigation shows that somehow the dart client think the socket was closed, which means it doesn't read the message from the underlying transport.

      Attachments

        Issue Links

          Activity

            allengeorge Allen George added a comment -

            So, AFAICT, this happens because the Dart client can't handle framed writes that come in two pieces.

            The Rust server is doing the following:

            1. Write the header bytes (i.e. message length)
            2. Write the body

            In this situation it seems like the dart client can't read a complete frame. Changing the rust server to do the write in one piece (i.e. header and body at the same time) seems to avoid the failure.

            allengeorge Allen George added a comment - So, AFAICT, this happens because the Dart client can't handle framed writes that come in two pieces. The Rust server is doing the following: Write the header bytes (i.e. message length) Write the body In this situation it seems like the dart client can't read a complete frame. Changing the rust server to do the write in one piece (i.e. header and body at the same time) seems to avoid the failure.
            jking3 James E. King III added a comment - - edited

            I'd say the Dart client is at fault. RPC systems should never rely on the size of a frame, since it can change too easily. Just add a VPN client to your system and your MTU might change when connecting to some systems, for example. Then again, having rust send once is more efficient, so that should be done anyway BUT only if it doesn't impose memory requirements that are too onerous to buffer the entire message just to prepend the length.

            jking3 James E. King III added a comment - - edited I'd say the Dart client is at fault. RPC systems should never rely on the size of a frame, since it can change too easily. Just add a VPN client to your system and your MTU might change when connecting to some systems, for example. Then again, having rust send once is more efficient, so that should be done anyway BUT only if it doesn't impose memory requirements that are too onerous to buffer the entire message just to prepend the length.
            allengeorge Allen George added a comment -

            Yeah - the Dart client is definitely at fault. I think it's because it's layering a blocking-style API onto an event-driven system without actually blocking. Basically, the sequence of events is as follows:

            1. Dart socket receives data
            2. Handler invoked, socket data placed into transport read buffer
            3. Thrift code makes read call
            4. Framed transport code reads header (i.e. message size)
            5. Framed transport code immediately reads message body; if none is available (i.e. bytes are 0) it assumes remote side closed

            Obviously the last point is false. The remote may be slow, or message sizes too small or what not. Just changing the linked line to `< 0` wouldn't help AFAICT because there's no way to indicate an error state in the current code. (NOTE: I may be wrong because I don't know Dart)

            allengeorge Allen George added a comment - Yeah - the Dart client is definitely at fault. I think it's because it's layering a blocking-style API onto an event-driven system without actually blocking. Basically, the sequence of events is as follows: Dart socket receives data Handler invoked, socket data placed into transport read buffer Thrift code makes read call Framed transport code reads header (i.e. message size) Framed transport code immediately reads message body; if none is available (i.e. bytes are 0) it assumes remote side closed Obviously the last point is false. The remote may be slow, or message sizes too small or what not. Just changing the linked line to `< 0` wouldn't help AFAICT because there's no way to indicate an error state in the current code. (NOTE: I may be wrong because I don't know Dart)
            allengeorge Allen George added a comment - - edited

            I can try to implement a gathering write in Rust which should both get around this issue (in the short term) and reduce the number of socket calls. The right thing to do is fix the Dart transport - and I try figure out how to do that.

            allengeorge Allen George added a comment - - edited I can try to implement a gathering write in Rust which should both get around this issue (in the short term) and reduce the number of socket calls. The right thing to do is fix the Dart transport - and I try figure out how to do that.
            markerickson-wk Mark Erickson added a comment - - edited

            allengeorge I agree with your conclusions and the assessment that the challenge with the Dart implementation is adapting an async (non-blocking) system to a blocking-style API. If Dart needs to support reading the header in a separate message from the body, then there will be some rework needed to collect messages in the buffer before attempting to read. I haven't put much thought into it lately, but let me know if you get stuck and I (or some of my colleagues) can try to help.

            markerickson-wk Mark Erickson added a comment - - edited allengeorge I agree with your conclusions and the assessment that the challenge with the Dart implementation is adapting an async (non-blocking) system to a blocking-style API. If Dart needs to support reading the header in a separate message from the body, then there will be some rework needed to collect messages in the buffer before attempting to read. I haven't put much thought into it lately, but let me know if you get stuck and I (or some of my colleagues) can try to help.
            allengeorge Allen George added a comment -

            markerickson-wk I think I can put together a first pass in a bit and have you comment on it. I've done some work like this in the past, so it's not entirely new for me. I'll ping you when I've got something you can critique.

            Thanks for letting me know I was looking in the right direction!

            allengeorge Allen George added a comment - markerickson-wk I think I can put together a first pass in a bit and have you comment on it. I've done some work like this in the past, so it's not entirely new for me. I'll ping you when I've got something you can critique. Thanks for letting me know I was looking in the right direction!
            githubbot ASF GitHub Bot added a comment -

            GitHub user allengeorge opened a pull request:

            https://github.com/apache/thrift/pull/1269

            THRIFT-4187 Allow dart framed transport to read incomplete frame

            You can merge this pull request into a Git repository by running:

            $ git pull https://github.com/allengeorge/thrift allen/thrift-4187-final

            Alternatively you can review and apply these changes as the patch at:

            https://github.com/apache/thrift/pull/1269.patch

            To close this pull request, make a commit to your master/trunk branch
            with (at least) the following in the commit message:

            This closes #1269


            commit ebcc53c36cdd34d7d7c33a849ebda1cff3aa7241
            Author: Allen George <allen.george@gmail.com>
            Date: 2017-05-13T17:41:20Z

            THRIFT-4187 Allow dart framed transport to read incomplete frame


            githubbot ASF GitHub Bot added a comment - GitHub user allengeorge opened a pull request: https://github.com/apache/thrift/pull/1269 THRIFT-4187 Allow dart framed transport to read incomplete frame You can merge this pull request into a Git repository by running: $ git pull https://github.com/allengeorge/thrift allen/thrift-4187-final Alternatively you can review and apply these changes as the patch at: https://github.com/apache/thrift/pull/1269.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1269 commit ebcc53c36cdd34d7d7c33a849ebda1cff3aa7241 Author: Allen George <allen.george@gmail.com> Date: 2017-05-13T17:41:20Z THRIFT-4187 Allow dart framed transport to read incomplete frame
            allengeorge Allen George added a comment -

            markerickson-wk I'd really appreciate it if you could give this PR a once-over. FWIW, there no longer seem to be any issues with the Rust server and dart client communicating.

            The only two things I'm not clear about:

            1. Do I have to make my own completer a "sync"
            2. Do I actually have to return something in the Future (right now I'm returning an object, because I don't see the need to return the actual bytes)
            allengeorge Allen George added a comment - markerickson-wk I'd really appreciate it if you could give this PR a once-over. FWIW, there no longer seem to be any issues with the Rust server and dart client communicating. The only two things I'm not clear about: Do I have to make my own completer a "sync" Do I actually have to return something in the Future (right now I'm returning an object, because I don't see the need to return the actual bytes)
            githubbot ASF GitHub Bot added a comment -

            Github user markerickson-wf commented on a diff in the pull request:

            https://github.com/apache/thrift/pull/1269#discussion_r116491360

            — Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
            @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport

            { if (got > 0) return got; }
            • _readFrame();
              + // IMPORTANT: by the time you've got here,
              + // an entire frame is available for reading

            return super.read(buffer, offset, length);
            }

            void _readFrame() {

            • _transport.readAll(headerBytes, 0, headerByteCount);
            • int size = headerBytes.buffer.asByteData().getUint32(0);
              + if (_body == null)
              Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }

              +
              + _readFrameBody();
              + }
              +
              + bool _readFrameHeader() {
              + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;

            • if (size < 0) {
              + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
              + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
            • Uint8List buffer = new Uint8List(size);
            • _transport.readAll(buffer, 0, size);
            • _setReadBuffer(buffer);
              + _receivedHeaderBytes += got;
              +
              + if (_receivedHeaderBytes == headerByteCount)
              Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }

              else

              { + _registerForReadableBytes(); + return false; + }

              + }
              +
              + void _readFrameBody() {
              + var remainingBodyBytes = _bodySize - _receivedBodyBytes;
              +
              + int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes);
              + if (got < 0)

              { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + }

              +
              + _receivedBodyBytes += got;
              +
              + if (_receivedBodyBytes == _bodySize) {
              + var body = _body;
              +
              + _bodySize = 0;
              + _body = null;
              + _receivedBodyBytes = 0;
              +
              + _setReadBuffer(body);
              +
              + var completer = _frameCompleter;
              + _frameCompleter = null;
              + completer.complete(new Object());

                • End diff –

            `completer` is typed as a `Completer<Uint8List>`, so the Dart analyzer complains about passing `Object` here.

            githubbot ASF GitHub Bot added a comment - Github user markerickson-wf commented on a diff in the pull request: https://github.com/apache/thrift/pull/1269#discussion_r116491360 — Diff: lib/dart/lib/src/transport/t_framed_transport.dart — @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport { if (got > 0) return got; } _readFrame(); + // IMPORTANT: by the time you've got here, + // an entire frame is available for reading return super.read(buffer, offset, length); } void _readFrame() { _transport.readAll(headerBytes, 0, headerByteCount); int size = headerBytes.buffer.asByteData().getUint32(0); + if (_body == null) Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + } + + _readFrameBody(); + } + + bool _readFrameHeader() { + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes; if (size < 0) { + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes); + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); } Uint8List buffer = new Uint8List(size); _transport.readAll(buffer, 0, size); _setReadBuffer(buffer); + _receivedHeaderBytes += got; + + if (_receivedHeaderBytes == headerByteCount) Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + } else { + _registerForReadableBytes(); + return false; + } + } + + void _readFrameBody() { + var remainingBodyBytes = _bodySize - _receivedBodyBytes; + + int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes); + if (got < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + } + + _receivedBodyBytes += got; + + if (_receivedBodyBytes == _bodySize) { + var body = _body; + + _bodySize = 0; + _body = null; + _receivedBodyBytes = 0; + + _setReadBuffer(body); + + var completer = _frameCompleter; + _frameCompleter = null; + completer.complete(new Object()); End diff – `completer` is typed as a `Completer<Uint8List>`, so the Dart analyzer complains about passing `Object` here.
            githubbot ASF GitHub Bot added a comment -

            Github user markerickson-wf commented on a diff in the pull request:

            https://github.com/apache/thrift/pull/1269#discussion_r116491597

            — Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
            @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport

            { if (got > 0) return got; }
            • _readFrame();
              + // IMPORTANT: by the time you've got here,
              + // an entire frame is available for reading

            return super.read(buffer, offset, length);
            }

            void _readFrame() {

            • _transport.readAll(headerBytes, 0, headerByteCount);
            • int size = headerBytes.buffer.asByteData().getUint32(0);
              + if (_body == null)
              Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }

              +
              + _readFrameBody();
              + }
              +
              + bool _readFrameHeader() {
              + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;

            • if (size < 0) {
              + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
              + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
            • Uint8List buffer = new Uint8List(size);
            • _transport.readAll(buffer, 0, size);
            • _setReadBuffer(buffer);
              + _receivedHeaderBytes += got;
              +
              + if (_receivedHeaderBytes == headerByteCount)
              Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }

              else

              { + _registerForReadableBytes(); + return false; + }

              + }
              +
              + void _readFrameBody()

              Unknown macro: { + var remainingBodyBytes = _bodySize - _receivedBodyBytes; + + int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes); + if (got < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + } + + _receivedBodyBytes += got; + + if (_receivedBodyBytes == _bodySize) { + var body = _body; + + _bodySize = 0; + _body = null; + _receivedBodyBytes = 0; + + _setReadBuffer(body); + + var completer = _frameCompleter; + _frameCompleter = null; + completer.complete(new Object()); + } else { + _registerForReadableBytes(); + } }

            Future flush() {

            • Uint8List buffer = consumeWriteBuffer();
            • int length = buffer.length;
              + if (_frameCompleter == null) {
              + Uint8List buffer = consumeWriteBuffer();
              + int length = buffer.length;
              +
              + _headerBytes.buffer.asByteData().setUint32(0, length);
              + _transport.write(_headerBytes, 0, headerByteCount);
              + _transport.write(buffer, 0, length);
              +
              + _frameCompleter = new Completer<Object>(); // FIXME: .sync?!
                • End diff –

            Did you mean to address this `FIXME`?

            githubbot ASF GitHub Bot added a comment - Github user markerickson-wf commented on a diff in the pull request: https://github.com/apache/thrift/pull/1269#discussion_r116491597 — Diff: lib/dart/lib/src/transport/t_framed_transport.dart — @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport { if (got > 0) return got; } _readFrame(); + // IMPORTANT: by the time you've got here, + // an entire frame is available for reading return super.read(buffer, offset, length); } void _readFrame() { _transport.readAll(headerBytes, 0, headerByteCount); int size = headerBytes.buffer.asByteData().getUint32(0); + if (_body == null) Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + } + + _readFrameBody(); + } + + bool _readFrameHeader() { + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes; if (size < 0) { + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes); + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); } Uint8List buffer = new Uint8List(size); _transport.readAll(buffer, 0, size); _setReadBuffer(buffer); + _receivedHeaderBytes += got; + + if (_receivedHeaderBytes == headerByteCount) Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + } else { + _registerForReadableBytes(); + return false; + } + } + + void _readFrameBody() Unknown macro: { + var remainingBodyBytes = _bodySize - _receivedBodyBytes; + + int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes); + if (got < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + } + + _receivedBodyBytes += got; + + if (_receivedBodyBytes == _bodySize) { + var body = _body; + + _bodySize = 0; + _body = null; + _receivedBodyBytes = 0; + + _setReadBuffer(body); + + var completer = _frameCompleter; + _frameCompleter = null; + completer.complete(new Object()); + } else { + _registerForReadableBytes(); + } } Future flush() { Uint8List buffer = consumeWriteBuffer(); int length = buffer.length; + if (_frameCompleter == null) { + Uint8List buffer = consumeWriteBuffer(); + int length = buffer.length; + + _headerBytes.buffer.asByteData().setUint32(0, length); + _transport.write(_headerBytes, 0, headerByteCount); + _transport.write(buffer, 0, length); + + _frameCompleter = new Completer<Object>(); // FIXME: .sync?! End diff – Did you mean to address this `FIXME`?
            githubbot ASF GitHub Bot added a comment -

            Github user markerickson-wf commented on the issue:

            https://github.com/apache/thrift/pull/1269

            There are a couple of related unit tests that are failing in `t_socket_transport_test.dart`. You can run `pub run test test/` from the `lib/dart` directory to see those.

            githubbot ASF GitHub Bot added a comment - Github user markerickson-wf commented on the issue: https://github.com/apache/thrift/pull/1269 There are a couple of related unit tests that are failing in `t_socket_transport_test.dart`. You can run `pub run test test/` from the `lib/dart` directory to see those.
            githubbot ASF GitHub Bot added a comment -

            Github user markerickson-wf commented on the issue:

            https://github.com/apache/thrift/pull/1269

            @allengeorge I think this approach looks pretty good at my first pass. I left a couple of comments, and we'll probably want unit tests around the state transitions to ensure that all paths are covered (setting / unsetting completers, resetting read counters, etc). I'll also ask couple of other devs I know to review (they have done some recent work using framed transports in Dart). Thanks for submitting the PR!

            githubbot ASF GitHub Bot added a comment - Github user markerickson-wf commented on the issue: https://github.com/apache/thrift/pull/1269 @allengeorge I think this approach looks pretty good at my first pass. I left a couple of comments, and we'll probably want unit tests around the state transitions to ensure that all paths are covered (setting / unsetting completers, resetting read counters, etc). I'll also ask couple of other devs I know to review (they have done some recent work using framed transports in Dart). Thanks for submitting the PR!
            githubbot ASF GitHub Bot added a comment -

            Github user markerickson-wf commented on a diff in the pull request:

            https://github.com/apache/thrift/pull/1269#discussion_r116509027

            — Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
            @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport

            { if (got > 0) return got; }
            • _readFrame();
              + // IMPORTANT: by the time you've got here,
              + // an entire frame is available for reading

            return super.read(buffer, offset, length);
            }

            void _readFrame() {

            • _transport.readAll(headerBytes, 0, headerByteCount);
            • int size = headerBytes.buffer.asByteData().getUint32(0);
              + if (_body == null)
              Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }

              +
              + _readFrameBody();
              + }
              +
              + bool _readFrameHeader() {
              + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;

            • if (size < 0) {
              + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
              + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
            • Uint8List buffer = new Uint8List(size);
            • _transport.readAll(buffer, 0, size);
            • _setReadBuffer(buffer);
              + _receivedHeaderBytes += got;
              +
              + if (_receivedHeaderBytes == headerByteCount)
              Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }

              else {
              + _registerForReadableBytes();

                • End diff –

            Does this create a synchronous cycle if the header bytes are not available? `_registerForReadableBytes`>`_readFrame`>`readFrameHeader`>`_registerForReadableBytes`>... It seems like this would cause an unresponsive browser?

            githubbot ASF GitHub Bot added a comment - Github user markerickson-wf commented on a diff in the pull request: https://github.com/apache/thrift/pull/1269#discussion_r116509027 — Diff: lib/dart/lib/src/transport/t_framed_transport.dart — @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport { if (got > 0) return got; } _readFrame(); + // IMPORTANT: by the time you've got here, + // an entire frame is available for reading return super.read(buffer, offset, length); } void _readFrame() { _transport.readAll(headerBytes, 0, headerByteCount); int size = headerBytes.buffer.asByteData().getUint32(0); + if (_body == null) Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + } + + _readFrameBody(); + } + + bool _readFrameHeader() { + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes; if (size < 0) { + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes); + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); } Uint8List buffer = new Uint8List(size); _transport.readAll(buffer, 0, size); _setReadBuffer(buffer); + _receivedHeaderBytes += got; + + if (_receivedHeaderBytes == headerByteCount) Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + } else { + _registerForReadableBytes(); End diff – Does this create a synchronous cycle if the header bytes are not available? `_registerForReadableBytes` >`_readFrame` >`readFrameHeader` >`_registerForReadableBytes` >... It seems like this would cause an unresponsive browser?
            githubbot ASF GitHub Bot added a comment -

            Github user markerickson-wf commented on a diff in the pull request:

            https://github.com/apache/thrift/pull/1269#discussion_r116510886

            — Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
            @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport

            { if (got > 0) return got; }
            • _readFrame();
              + // IMPORTANT: by the time you've got here,
              + // an entire frame is available for reading

            return super.read(buffer, offset, length);
            }

            void _readFrame() {

            • _transport.readAll(headerBytes, 0, headerByteCount);
            • int size = headerBytes.buffer.asByteData().getUint32(0);
              + if (_body == null)
              Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }

              +
              + _readFrameBody();
              + }
              +
              + bool _readFrameHeader() {
              + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;

            • if (size < 0) {
              + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
              + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
            • Uint8List buffer = new Uint8List(size);
            • _transport.readAll(buffer, 0, size);
            • _setReadBuffer(buffer);
              + _receivedHeaderBytes += got;
              +
              + if (_receivedHeaderBytes == headerByteCount)
              Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }

              else {
              + _registerForReadableBytes();

                • End diff –

            At first I was reading it as causing a cycle, but `_registerForReadableBytes` will make an async call to `_transport.flush`, so I think this is okay on second thought.

            githubbot ASF GitHub Bot added a comment - Github user markerickson-wf commented on a diff in the pull request: https://github.com/apache/thrift/pull/1269#discussion_r116510886 — Diff: lib/dart/lib/src/transport/t_framed_transport.dart — @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport { if (got > 0) return got; } _readFrame(); + // IMPORTANT: by the time you've got here, + // an entire frame is available for reading return super.read(buffer, offset, length); } void _readFrame() { _transport.readAll(headerBytes, 0, headerByteCount); int size = headerBytes.buffer.asByteData().getUint32(0); + if (_body == null) Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + } + + _readFrameBody(); + } + + bool _readFrameHeader() { + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes; if (size < 0) { + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes); + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); } Uint8List buffer = new Uint8List(size); _transport.readAll(buffer, 0, size); _setReadBuffer(buffer); + _receivedHeaderBytes += got; + + if (_receivedHeaderBytes == headerByteCount) Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + } else { + _registerForReadableBytes(); End diff – At first I was reading it as causing a cycle, but `_registerForReadableBytes` will make an async call to `_transport.flush`, so I think this is okay on second thought.
            githubbot ASF GitHub Bot added a comment -

            Github user allengeorge commented on a diff in the pull request:

            https://github.com/apache/thrift/pull/1269#discussion_r116516987

            — Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
            @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport

            { if (got > 0) return got; }
            • _readFrame();
              + // IMPORTANT: by the time you've got here,
              + // an entire frame is available for reading

            return super.read(buffer, offset, length);
            }

            void _readFrame() {

            • _transport.readAll(headerBytes, 0, headerByteCount);
            • int size = headerBytes.buffer.asByteData().getUint32(0);
              + if (_body == null)
              Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }

              +
              + _readFrameBody();
              + }
              +
              + bool _readFrameHeader() {
              + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;

            • if (size < 0) {
              + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
              + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
            • Uint8List buffer = new Uint8List(size);
            • _transport.readAll(buffer, 0, size);
            • _setReadBuffer(buffer);
              + _receivedHeaderBytes += got;
              +
              + if (_receivedHeaderBytes == headerByteCount)
              Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }

              else

              { + _registerForReadableBytes(); + return false; + }

              + }
              +
              + void _readFrameBody() {
              + var remainingBodyBytes = _bodySize - _receivedBodyBytes;
              +
              + int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes);
              + if (got < 0)

              { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + }

              +
              + _receivedBodyBytes += got;
              +
              + if (_receivedBodyBytes == _bodySize) {
              + var body = _body;
              +
              + _bodySize = 0;
              + _body = null;
              + _receivedBodyBytes = 0;
              +
              + _setReadBuffer(body);
              +
              + var completer = _frameCompleter;
              + _frameCompleter = null;
              + completer.complete(new Object());

                • End diff –

            I was a little worried about returning the frame bytes, but I guess it doesn't matter. Also, I assume that each call to `iterator` returns a completely separate iterator with a view to the underlying buffer.

            I'll change it. BTW, it's a little weird that `flush()` returns bytes...

            githubbot ASF GitHub Bot added a comment - Github user allengeorge commented on a diff in the pull request: https://github.com/apache/thrift/pull/1269#discussion_r116516987 — Diff: lib/dart/lib/src/transport/t_framed_transport.dart — @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport { if (got > 0) return got; } _readFrame(); + // IMPORTANT: by the time you've got here, + // an entire frame is available for reading return super.read(buffer, offset, length); } void _readFrame() { _transport.readAll(headerBytes, 0, headerByteCount); int size = headerBytes.buffer.asByteData().getUint32(0); + if (_body == null) Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + } + + _readFrameBody(); + } + + bool _readFrameHeader() { + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes; if (size < 0) { + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes); + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); } Uint8List buffer = new Uint8List(size); _transport.readAll(buffer, 0, size); _setReadBuffer(buffer); + _receivedHeaderBytes += got; + + if (_receivedHeaderBytes == headerByteCount) Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + } else { + _registerForReadableBytes(); + return false; + } + } + + void _readFrameBody() { + var remainingBodyBytes = _bodySize - _receivedBodyBytes; + + int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes); + if (got < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + } + + _receivedBodyBytes += got; + + if (_receivedBodyBytes == _bodySize) { + var body = _body; + + _bodySize = 0; + _body = null; + _receivedBodyBytes = 0; + + _setReadBuffer(body); + + var completer = _frameCompleter; + _frameCompleter = null; + completer.complete(new Object()); End diff – I was a little worried about returning the frame bytes, but I guess it doesn't matter. Also, I assume that each call to `iterator` returns a completely separate iterator with a view to the underlying buffer. I'll change it. BTW, it's a little weird that `flush()` returns bytes...
            githubbot ASF GitHub Bot added a comment -

            Github user allengeorge commented on a diff in the pull request:

            https://github.com/apache/thrift/pull/1269#discussion_r116517213

            — Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
            @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport

            { if (got > 0) return got; }
            • _readFrame();
              + // IMPORTANT: by the time you've got here,
              + // an entire frame is available for reading

            return super.read(buffer, offset, length);
            }

            void _readFrame() {

            • _transport.readAll(headerBytes, 0, headerByteCount);
            • int size = headerBytes.buffer.asByteData().getUint32(0);
              + if (_body == null)
              Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }

              +
              + _readFrameBody();
              + }
              +
              + bool _readFrameHeader() {
              + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;

            • if (size < 0) {
              + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
              + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
            • Uint8List buffer = new Uint8List(size);
            • _transport.readAll(buffer, 0, size);
            • _setReadBuffer(buffer);
              + _receivedHeaderBytes += got;
              +
              + if (_receivedHeaderBytes == headerByteCount)
              Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }

              else

              { + _registerForReadableBytes(); + return false; + }

              + }
              +
              + void _readFrameBody()

              Unknown macro: { + var remainingBodyBytes = _bodySize - _receivedBodyBytes; + + int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes); + if (got < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + } + + _receivedBodyBytes += got; + + if (_receivedBodyBytes == _bodySize) { + var body = _body; + + _bodySize = 0; + _body = null; + _receivedBodyBytes = 0; + + _setReadBuffer(body); + + var completer = _frameCompleter; + _frameCompleter = null; + completer.complete(new Object()); + } else { + _registerForReadableBytes(); + } }

            Future flush() {

            • Uint8List buffer = consumeWriteBuffer();
            • int length = buffer.length;
              + if (_frameCompleter == null) {
              + Uint8List buffer = consumeWriteBuffer();
              + int length = buffer.length;
              +
              + _headerBytes.buffer.asByteData().setUint32(0, length);
              + _transport.write(_headerBytes, 0, headerByteCount);
              + _transport.write(buffer, 0, length);
              +
              + _frameCompleter = new Completer<Object>(); // FIXME: .sync?!
                • End diff –

            Ah. Actually I need advice from you as to whether this has to be a `sync` `Completer` or not It's not quite clear to me when one would choose one, and whether that's necessary at this point.

            githubbot ASF GitHub Bot added a comment - Github user allengeorge commented on a diff in the pull request: https://github.com/apache/thrift/pull/1269#discussion_r116517213 — Diff: lib/dart/lib/src/transport/t_framed_transport.dart — @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport { if (got > 0) return got; } _readFrame(); + // IMPORTANT: by the time you've got here, + // an entire frame is available for reading return super.read(buffer, offset, length); } void _readFrame() { _transport.readAll(headerBytes, 0, headerByteCount); int size = headerBytes.buffer.asByteData().getUint32(0); + if (_body == null) Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + } + + _readFrameBody(); + } + + bool _readFrameHeader() { + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes; if (size < 0) { + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes); + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); } Uint8List buffer = new Uint8List(size); _transport.readAll(buffer, 0, size); _setReadBuffer(buffer); + _receivedHeaderBytes += got; + + if (_receivedHeaderBytes == headerByteCount) Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + } else { + _registerForReadableBytes(); + return false; + } + } + + void _readFrameBody() Unknown macro: { + var remainingBodyBytes = _bodySize - _receivedBodyBytes; + + int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes); + if (got < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + } + + _receivedBodyBytes += got; + + if (_receivedBodyBytes == _bodySize) { + var body = _body; + + _bodySize = 0; + _body = null; + _receivedBodyBytes = 0; + + _setReadBuffer(body); + + var completer = _frameCompleter; + _frameCompleter = null; + completer.complete(new Object()); + } else { + _registerForReadableBytes(); + } } Future flush() { Uint8List buffer = consumeWriteBuffer(); int length = buffer.length; + if (_frameCompleter == null) { + Uint8List buffer = consumeWriteBuffer(); + int length = buffer.length; + + _headerBytes.buffer.asByteData().setUint32(0, length); + _transport.write(_headerBytes, 0, headerByteCount); + _transport.write(buffer, 0, length); + + _frameCompleter = new Completer<Object>(); // FIXME: .sync?! End diff – Ah. Actually I need advice from you as to whether this has to be a `sync` `Completer` or not It's not quite clear to me when one would choose one, and whether that's necessary at this point.
            githubbot ASF GitHub Bot added a comment -

            Github user allengeorge commented on a diff in the pull request:

            https://github.com/apache/thrift/pull/1269#discussion_r116517412

            — Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
            @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport

            { if (got > 0) return got; }
            • _readFrame();
              + // IMPORTANT: by the time you've got here,
              + // an entire frame is available for reading

            return super.read(buffer, offset, length);
            }

            void _readFrame() {

            • _transport.readAll(headerBytes, 0, headerByteCount);
            • int size = headerBytes.buffer.asByteData().getUint32(0);
              + if (_body == null)
              Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }

              +
              + _readFrameBody();
              + }
              +
              + bool _readFrameHeader() {
              + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;

            • if (size < 0) {
              + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
              + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
            • Uint8List buffer = new Uint8List(size);
            • _transport.readAll(buffer, 0, size);
            • _setReadBuffer(buffer);
              + _receivedHeaderBytes += got;
              +
              + if (_receivedHeaderBytes == headerByteCount)
              Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }

              else {
              + _registerForReadableBytes();

                • End diff –

            It shouldn't cause a cycle, but I may have missed something? It's the first time I've done anything with Dart.

            githubbot ASF GitHub Bot added a comment - Github user allengeorge commented on a diff in the pull request: https://github.com/apache/thrift/pull/1269#discussion_r116517412 — Diff: lib/dart/lib/src/transport/t_framed_transport.dart — @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport { if (got > 0) return got; } _readFrame(); + // IMPORTANT: by the time you've got here, + // an entire frame is available for reading return super.read(buffer, offset, length); } void _readFrame() { _transport.readAll(headerBytes, 0, headerByteCount); int size = headerBytes.buffer.asByteData().getUint32(0); + if (_body == null) Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + } + + _readFrameBody(); + } + + bool _readFrameHeader() { + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes; if (size < 0) { + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes); + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); } Uint8List buffer = new Uint8List(size); _transport.readAll(buffer, 0, size); _setReadBuffer(buffer); + _receivedHeaderBytes += got; + + if (_receivedHeaderBytes == headerByteCount) Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + } else { + _registerForReadableBytes(); End diff – It shouldn't cause a cycle, but I may have missed something? It's the first time I've done anything with Dart.
            githubbot ASF GitHub Bot added a comment -

            Github user allengeorge commented on the issue:

            https://github.com/apache/thrift/pull/1269

            @markerickson-wf My bad - I should definitely have run the unit tests. I'll check them and fix them up. Also, I'll look into how to create a reasonable set of unit tests for the framed transport.

            githubbot ASF GitHub Bot added a comment - Github user allengeorge commented on the issue: https://github.com/apache/thrift/pull/1269 @markerickson-wf My bad - I should definitely have run the unit tests. I'll check them and fix them up. Also, I'll look into how to create a reasonable set of unit tests for the framed transport.
            githubbot ASF GitHub Bot added a comment -

            Github user markerickson-wf commented on a diff in the pull request:

            https://github.com/apache/thrift/pull/1269#discussion_r116519094

            — Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
            @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport

            { if (got > 0) return got; }
            • _readFrame();
              + // IMPORTANT: by the time you've got here,
              + // an entire frame is available for reading

            return super.read(buffer, offset, length);
            }

            void _readFrame() {

            • _transport.readAll(headerBytes, 0, headerByteCount);
            • int size = headerBytes.buffer.asByteData().getUint32(0);
              + if (_body == null)
              Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }

              +
              + _readFrameBody();
              + }
              +
              + bool _readFrameHeader() {
              + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;

            • if (size < 0) {
              + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
              + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
            • Uint8List buffer = new Uint8List(size);
            • _transport.readAll(buffer, 0, size);
            • _setReadBuffer(buffer);
              + _receivedHeaderBytes += got;
              +
              + if (_receivedHeaderBytes == headerByteCount)
              Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }

              else {
              + _registerForReadableBytes();

                • End diff –

            Yeah, I think it should be okay, sorry for raising the false alarm on my deleted comment.

            githubbot ASF GitHub Bot added a comment - Github user markerickson-wf commented on a diff in the pull request: https://github.com/apache/thrift/pull/1269#discussion_r116519094 — Diff: lib/dart/lib/src/transport/t_framed_transport.dart — @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport { if (got > 0) return got; } _readFrame(); + // IMPORTANT: by the time you've got here, + // an entire frame is available for reading return super.read(buffer, offset, length); } void _readFrame() { _transport.readAll(headerBytes, 0, headerByteCount); int size = headerBytes.buffer.asByteData().getUint32(0); + if (_body == null) Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + } + + _readFrameBody(); + } + + bool _readFrameHeader() { + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes; if (size < 0) { + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes); + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); } Uint8List buffer = new Uint8List(size); _transport.readAll(buffer, 0, size); _setReadBuffer(buffer); + _receivedHeaderBytes += got; + + if (_receivedHeaderBytes == headerByteCount) Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + } else { + _registerForReadableBytes(); End diff – Yeah, I think it should be okay, sorry for raising the false alarm on my deleted comment.
            githubbot ASF GitHub Bot added a comment -

            Github user markerickson-wf commented on a diff in the pull request:

            https://github.com/apache/thrift/pull/1269#discussion_r116523611

            — Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
            @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport

            { if (got > 0) return got; }
            • _readFrame();
              + // IMPORTANT: by the time you've got here,
              + // an entire frame is available for reading

            return super.read(buffer, offset, length);
            }

            void _readFrame() {

            • _transport.readAll(headerBytes, 0, headerByteCount);
            • int size = headerBytes.buffer.asByteData().getUint32(0);
              + if (_body == null)
              Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }

              +
              + _readFrameBody();
              + }
              +
              + bool _readFrameHeader() {
              + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;

            • if (size < 0) {
              + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
              + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
            • Uint8List buffer = new Uint8List(size);
            • _transport.readAll(buffer, 0, size);
            • _setReadBuffer(buffer);
              + _receivedHeaderBytes += got;
              +
              + if (_receivedHeaderBytes == headerByteCount)
              Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }

              else

              { + _registerForReadableBytes(); + return false; + }

              + }
              +
              + void _readFrameBody() {
              + var remainingBodyBytes = _bodySize - _receivedBodyBytes;
              +
              + int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes);
              + if (got < 0)

              { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + }

              +
              + _receivedBodyBytes += got;
              +
              + if (_receivedBodyBytes == _bodySize) {
              + var body = _body;
              +
              + _bodySize = 0;
              + _body = null;
              + _receivedBodyBytes = 0;
              +
              + _setReadBuffer(body);
              +
              + var completer = _frameCompleter;
              + _frameCompleter = null;
              + completer.complete(new Object());

                • End diff –

            Yeah, I don't think you need to type `Completer` at all, and you can just `complete()`. In this case the Future is just a signal of completion, I agree we don't need to return the bytes.

            githubbot ASF GitHub Bot added a comment - Github user markerickson-wf commented on a diff in the pull request: https://github.com/apache/thrift/pull/1269#discussion_r116523611 — Diff: lib/dart/lib/src/transport/t_framed_transport.dart — @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport { if (got > 0) return got; } _readFrame(); + // IMPORTANT: by the time you've got here, + // an entire frame is available for reading return super.read(buffer, offset, length); } void _readFrame() { _transport.readAll(headerBytes, 0, headerByteCount); int size = headerBytes.buffer.asByteData().getUint32(0); + if (_body == null) Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + } + + _readFrameBody(); + } + + bool _readFrameHeader() { + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes; if (size < 0) { + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes); + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); } Uint8List buffer = new Uint8List(size); _transport.readAll(buffer, 0, size); _setReadBuffer(buffer); + _receivedHeaderBytes += got; + + if (_receivedHeaderBytes == headerByteCount) Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + } else { + _registerForReadableBytes(); + return false; + } + } + + void _readFrameBody() { + var remainingBodyBytes = _bodySize - _receivedBodyBytes; + + int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes); + if (got < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + } + + _receivedBodyBytes += got; + + if (_receivedBodyBytes == _bodySize) { + var body = _body; + + _bodySize = 0; + _body = null; + _receivedBodyBytes = 0; + + _setReadBuffer(body); + + var completer = _frameCompleter; + _frameCompleter = null; + completer.complete(new Object()); End diff – Yeah, I don't think you need to type `Completer` at all, and you can just `complete()`. In this case the Future is just a signal of completion, I agree we don't need to return the bytes.
            githubbot ASF GitHub Bot added a comment -

            Github user markerickson-wf commented on a diff in the pull request:

            https://github.com/apache/thrift/pull/1269#discussion_r116525084

            — Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
            @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport

            { if (got > 0) return got; }
            • _readFrame();
              + // IMPORTANT: by the time you've got here,
              + // an entire frame is available for reading

            return super.read(buffer, offset, length);
            }

            void _readFrame() {

            • _transport.readAll(headerBytes, 0, headerByteCount);
            • int size = headerBytes.buffer.asByteData().getUint32(0);
              + if (_body == null)
              Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }

              +
              + _readFrameBody();
              + }
              +
              + bool _readFrameHeader() {
              + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;

            • if (size < 0) {
              + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
              + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
            • Uint8List buffer = new Uint8List(size);
            • _transport.readAll(buffer, 0, size);
            • _setReadBuffer(buffer);
              + _receivedHeaderBytes += got;
              +
              + if (_receivedHeaderBytes == headerByteCount)
              Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }

              else

              { + _registerForReadableBytes(); + return false; + }

              + }
              +
              + void _readFrameBody()

              Unknown macro: { + var remainingBodyBytes = _bodySize - _receivedBodyBytes; + + int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes); + if (got < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + } + + _receivedBodyBytes += got; + + if (_receivedBodyBytes == _bodySize) { + var body = _body; + + _bodySize = 0; + _body = null; + _receivedBodyBytes = 0; + + _setReadBuffer(body); + + var completer = _frameCompleter; + _frameCompleter = null; + completer.complete(new Object()); + } else { + _registerForReadableBytes(); + } }

            Future flush() {

            • Uint8List buffer = consumeWriteBuffer();
            • int length = buffer.length;
              + if (_frameCompleter == null) {
              + Uint8List buffer = consumeWriteBuffer();
              + int length = buffer.length;
              +
              + _headerBytes.buffer.asByteData().setUint32(0, length);
              + _transport.write(_headerBytes, 0, headerByteCount);
              + _transport.write(buffer, 0, length);
              +
              + _frameCompleter = new Completer<Object>(); // FIXME: .sync?!
                • End diff –

            https://api.dartlang.org/stable/1.23.0/dart-async/Completer/Completer.sync.html

            Despite the warning above, I used `sync` [in t_http_transport](https://github.com/apache/thrift/blob/master/lib/dart/lib/src/transport/t_http_transport.dart#L52). If I recall correctly, this is a symptom of adapting the sync API to an async system. If two subsequent messages arrive on the socket, the second could stomp on the first before the listener has a chance to react. i.e. `flush`>`complete message 1`>`complete message 2`>`read`>`read`. By using a sync completer, I think the first listener should be able to read before the second message is completed.

            githubbot ASF GitHub Bot added a comment - Github user markerickson-wf commented on a diff in the pull request: https://github.com/apache/thrift/pull/1269#discussion_r116525084 — Diff: lib/dart/lib/src/transport/t_framed_transport.dart — @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport { if (got > 0) return got; } _readFrame(); + // IMPORTANT: by the time you've got here, + // an entire frame is available for reading return super.read(buffer, offset, length); } void _readFrame() { _transport.readAll(headerBytes, 0, headerByteCount); int size = headerBytes.buffer.asByteData().getUint32(0); + if (_body == null) Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + } + + _readFrameBody(); + } + + bool _readFrameHeader() { + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes; if (size < 0) { + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes); + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); } Uint8List buffer = new Uint8List(size); _transport.readAll(buffer, 0, size); _setReadBuffer(buffer); + _receivedHeaderBytes += got; + + if (_receivedHeaderBytes == headerByteCount) Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + } else { + _registerForReadableBytes(); + return false; + } + } + + void _readFrameBody() Unknown macro: { + var remainingBodyBytes = _bodySize - _receivedBodyBytes; + + int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes); + if (got < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + } + + _receivedBodyBytes += got; + + if (_receivedBodyBytes == _bodySize) { + var body = _body; + + _bodySize = 0; + _body = null; + _receivedBodyBytes = 0; + + _setReadBuffer(body); + + var completer = _frameCompleter; + _frameCompleter = null; + completer.complete(new Object()); + } else { + _registerForReadableBytes(); + } } Future flush() { Uint8List buffer = consumeWriteBuffer(); int length = buffer.length; + if (_frameCompleter == null) { + Uint8List buffer = consumeWriteBuffer(); + int length = buffer.length; + + _headerBytes.buffer.asByteData().setUint32(0, length); + _transport.write(_headerBytes, 0, headerByteCount); + _transport.write(buffer, 0, length); + + _frameCompleter = new Completer<Object>(); // FIXME: .sync?! End diff – https://api.dartlang.org/stable/1.23.0/dart-async/Completer/Completer.sync.html Despite the warning above, I used `sync` [in t_http_transport] ( https://github.com/apache/thrift/blob/master/lib/dart/lib/src/transport/t_http_transport.dart#L52 ). If I recall correctly, this is a symptom of adapting the sync API to an async system. If two subsequent messages arrive on the socket, the second could stomp on the first before the listener has a chance to react. i.e. `flush` >`complete message 1` >`complete message 2` >`read` >`read`. By using a sync completer, I think the first listener should be able to read before the second message is completed.
            githubbot ASF GitHub Bot added a comment -

            Github user jeking3 commented on the issue:

            https://github.com/apache/thrift/pull/1269

            This pull request needs to be completed. nudge

            githubbot ASF GitHub Bot added a comment - Github user jeking3 commented on the issue: https://github.com/apache/thrift/pull/1269 This pull request needs to be completed. nudge
            allengeorge Allen George added a comment -

            Ooops. Yup - will get on this soon!

            allengeorge Allen George added a comment - Ooops. Yup - will get on this soon!
            githubbot ASF GitHub Bot added a comment -

            Github user jeking3 commented on the issue:

            https://github.com/apache/thrift/pull/1269

            @allengeorge if you add framed transport in dart to the cross test, that would test a lot... but it would not test error conditions such as this one. Just a reminder, this needs to move forward.

            githubbot ASF GitHub Bot added a comment - Github user jeking3 commented on the issue: https://github.com/apache/thrift/pull/1269 @allengeorge if you add framed transport in dart to the cross test, that would test a lot... but it would not test error conditions such as this one. Just a reminder, this needs to move forward.
            githubbot ASF GitHub Bot added a comment -

            Github user allengeorge commented on the issue:

            https://github.com/apache/thrift/pull/1269

            Hi James - I’m really sorry for the delay; slammed at work. I’ll work on it this weekend.

            Again, super sorry about this

            ________________________________
            From: James E. King, III <notifications@github.com>
            Sent: Wednesday, October 25, 2017 10:44:19 AM
            To: apache/thrift
            Cc: Allen George; Mention
            Subject: Re: [apache/thrift] THRIFT-4187 Allow dart framed transport to read incomplete frame (#1269)

            @allengeorge<https://github.com/allengeorge> if you add framed transport in dart to the cross test, that would test a lot... but it would not test error conditions such as this one. Just a reminder, this needs to move forward.

            —
            You are receiving this because you were mentioned.
            Reply to this email directly, view it on GitHub<https://github.com/apache/thrift/pull/1269#issuecomment-339353877>, or mute the thread<https://github.com/notifications/unsubscribe-auth/AAEO9HLOiegU2gF0E4zeUS7XjKsp5ZJlks5sv0lDgaJpZM4NaaLl>.

            githubbot ASF GitHub Bot added a comment - Github user allengeorge commented on the issue: https://github.com/apache/thrift/pull/1269 Hi James - I’m really sorry for the delay; slammed at work. I’ll work on it this weekend. Again, super sorry about this ________________________________ From: James E. King, III <notifications@github.com> Sent: Wednesday, October 25, 2017 10:44:19 AM To: apache/thrift Cc: Allen George; Mention Subject: Re: [apache/thrift] THRIFT-4187 Allow dart framed transport to read incomplete frame (#1269) @allengeorge< https://github.com/allengeorge > if you add framed transport in dart to the cross test, that would test a lot... but it would not test error conditions such as this one. Just a reminder, this needs to move forward. — You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub< https://github.com/apache/thrift/pull/1269#issuecomment-339353877 >, or mute the thread< https://github.com/notifications/unsubscribe-auth/AAEO9HLOiegU2gF0E4zeUS7XjKsp5ZJlks5sv0lDgaJpZM4NaaLl >.
            githubbot ASF GitHub Bot added a comment -

            Github user jeking3 commented on the issue:

            https://github.com/apache/thrift/pull/1269

            Don't worry, I'm just doing my usual "push things forward" bit. I got the backlog down to 52 open PRs, but that's still about 45 too many for my taste.

            githubbot ASF GitHub Bot added a comment - Github user jeking3 commented on the issue: https://github.com/apache/thrift/pull/1269 Don't worry, I'm just doing my usual "push things forward" bit. I got the backlog down to 52 open PRs, but that's still about 45 too many for my taste.
            allengeorge Allen George added a comment -

            Just a ping that I'm restarting work on this now. Will re-learn Dart and try write some tests

            allengeorge Allen George added a comment - Just a ping that I'm restarting work on this now. Will re-learn Dart and try write some tests
            githubbot ASF GitHub Bot added a comment -

            Github user jeking3 commented on the issue:

            https://github.com/apache/thrift/pull/1269

            This needs to be rebased and completed.

            githubbot ASF GitHub Bot added a comment - Github user jeking3 commented on the issue: https://github.com/apache/thrift/pull/1269 This needs to be rebased and completed.
            githubbot ASF GitHub Bot added a comment -

            Github user jeking3 commented on the issue:

            https://github.com/apache/thrift/pull/1269

            This needs to be rebased and completed.

            githubbot ASF GitHub Bot added a comment - Github user jeking3 commented on the issue: https://github.com/apache/thrift/pull/1269 This needs to be rebased and completed.
            githubbot ASF GitHub Bot added a comment -

            Github user allengeorge commented on the issue:

            https://github.com/apache/thrift/pull/1269

            Yup. Will do. I'm a gonna set myself a target to get it done by end of next week. I've been lax on this :/

            githubbot ASF GitHub Bot added a comment - Github user allengeorge commented on the issue: https://github.com/apache/thrift/pull/1269 Yup. Will do. I'm a gonna set myself a target to get it done by end of next week. I've been lax on this :/
            githubbot ASF GitHub Bot added a comment -

            Github user allengeorge commented on the issue:

            https://github.com/apache/thrift/pull/1269

            @jeking3 Fixed up the failing tests. Confirming that precross works.

            githubbot ASF GitHub Bot added a comment - Github user allengeorge commented on the issue: https://github.com/apache/thrift/pull/1269 @jeking3 Fixed up the failing tests. Confirming that precross works.
            githubbot ASF GitHub Bot added a comment -

            Github user allengeorge commented on the issue:

            https://github.com/apache/thrift/pull/1269

            @jeking3 I've added unit tests around this behavior and enabled more cross tests! Do you want me to roll that into this PR, or should I create another bug and PR?

            githubbot ASF GitHub Bot added a comment - Github user allengeorge commented on the issue: https://github.com/apache/thrift/pull/1269 @jeking3 I've added unit tests around this behavior and enabled more cross tests! Do you want me to roll that into this PR, or should I create another bug and PR?
            githubbot ASF GitHub Bot added a comment -

            Github user allengeorge commented on the issue:

            https://github.com/apache/thrift/pull/1269

            @jeking3 Any chance this could be merged? TY!

            githubbot ASF GitHub Bot added a comment - Github user allengeorge commented on the issue: https://github.com/apache/thrift/pull/1269 @jeking3 Any chance this could be merged? TY!
            githubbot ASF GitHub Bot added a comment -

            Github user jeking3 commented on the issue:

            https://github.com/apache/thrift/pull/1269

            I'd suggest waiting to see how the CI build fares given a change was made. So today is a distinct possibility. Also, you can commit your own changes. There's no rule against it.

            githubbot ASF GitHub Bot added a comment - Github user jeking3 commented on the issue: https://github.com/apache/thrift/pull/1269 I'd suggest waiting to see how the CI build fares given a change was made. So today is a distinct possibility. Also, you can commit your own changes. There's no rule against it.
            githubbot ASF GitHub Bot added a comment -

            Github user markerickson-wf commented on the issue:

            https://github.com/apache/thrift/pull/1269

            +1

            githubbot ASF GitHub Bot added a comment - Github user markerickson-wf commented on the issue: https://github.com/apache/thrift/pull/1269 +1
            githubbot ASF GitHub Bot added a comment -

            Github user allengeorge commented on the issue:

            https://github.com/apache/thrift/pull/1269

            No problem! I'm sorry I was lax and left it so long :/

            Looks like the tests passed, so it's good to merge. Not 100% of the process, so I'll follow up via email.

            githubbot ASF GitHub Bot added a comment - Github user allengeorge commented on the issue: https://github.com/apache/thrift/pull/1269 No problem! I'm sorry I was lax and left it so long :/ Looks like the tests passed, so it's good to merge. Not 100% of the process, so I'll follow up via email.
            githubbot ASF GitHub Bot added a comment -

            Github user asfgit closed the pull request at:

            https://github.com/apache/thrift/pull/1269

            githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/thrift/pull/1269
            githubbot ASF GitHub Bot added a comment -

            Github user jeking3 commented on the issue:

            https://github.com/apache/thrift/pull/1269

            Remember to resolve https://issues.apache.org/jira/browse/THRIFT-4187 as fixed in 0.12.0.

            githubbot ASF GitHub Bot added a comment - Github user jeking3 commented on the issue: https://github.com/apache/thrift/pull/1269 Remember to resolve https://issues.apache.org/jira/browse/THRIFT-4187 as fixed in 0.12.0.
            allengeorge Allen George added a comment -

            Fixed as of b4960838a3b20e6bcf61727f21214a47418a2ca5

            allengeorge Allen George added a comment - Fixed as of b4960838a3b20e6bcf61727f21214a47418a2ca5
            githubbot ASF GitHub Bot added a comment -

            Github user allengeorge commented on the issue:

            https://github.com/apache/thrift/pull/1269

            @jeking3 Yup - just did that. Is there a way to do it automatically, or is it a manual process (for now)?

            githubbot ASF GitHub Bot added a comment - Github user allengeorge commented on the issue: https://github.com/apache/thrift/pull/1269 @jeking3 Yup - just did that. Is there a way to do it automatically, or is it a manual process (for now)?

            People

              allengeorge Allen George
              allengeorge Allen George
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: