Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Fixed
-
None
-
None
Description
For some reason the Dart (client) -> Rust (server) framed-transport cross tests fail. Initial investigation shows that somehow the dart client think the socket was closed, which means it doesn't read the message from the underlying transport.
Attachments
Issue Links
- links to
Activity
I'd say the Dart client is at fault. RPC systems should never rely on the size of a frame, since it can change too easily. Just add a VPN client to your system and your MTU might change when connecting to some systems, for example. Then again, having rust send once is more efficient, so that should be done anyway BUT only if it doesn't impose memory requirements that are too onerous to buffer the entire message just to prepend the length.
Yeah - the Dart client is definitely at fault. I think it's because it's layering a blocking-style API onto an event-driven system without actually blocking. Basically, the sequence of events is as follows:
- Dart socket receives data
- Handler invoked, socket data placed into transport read buffer
- Thrift code makes read call
- Framed transport code reads header (i.e. message size)
- Framed transport code immediately reads message body; if none is available (i.e. bytes are 0) it assumes remote side closed
Obviously the last point is false. The remote may be slow, or message sizes too small or what not. Just changing the linked line to `< 0` wouldn't help AFAICT because there's no way to indicate an error state in the current code. (NOTE: I may be wrong because I don't know Dart)
I can try to implement a gathering write in Rust which should both get around this issue (in the short term) and reduce the number of socket calls. The right thing to do is fix the Dart transport - and I try figure out how to do that.
allengeorge I agree with your conclusions and the assessment that the challenge with the Dart implementation is adapting an async (non-blocking) system to a blocking-style API. If Dart needs to support reading the header in a separate message from the body, then there will be some rework needed to collect messages in the buffer before attempting to read. I haven't put much thought into it lately, but let me know if you get stuck and I (or some of my colleagues) can try to help.
markerickson-wk I think I can put together a first pass in a bit and have you comment on it. I've done some work like this in the past, so it's not entirely new for me. I'll ping you when I've got something you can critique.
Thanks for letting me know I was looking in the right direction!
GitHub user allengeorge opened a pull request:
https://github.com/apache/thrift/pull/1269
THRIFT-4187 Allow dart framed transport to read incomplete frame
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/allengeorge/thrift allen/thrift-4187-final
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/thrift/pull/1269.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1269
commit ebcc53c36cdd34d7d7c33a849ebda1cff3aa7241
Author: Allen George <allen.george@gmail.com>
Date: 2017-05-13T17:41:20Z
THRIFT-4187 Allow dart framed transport to read incomplete frame
markerickson-wk I'd really appreciate it if you could give this PR a once-over. FWIW, there no longer seem to be any issues with the Rust server and dart client communicating.
The only two things I'm not clear about:
- Do I have to make my own completer a "sync"
- Do I actually have to return something in the Future (right now I'm returning an object, because I don't see the need to return the actual bytes)
Github user markerickson-wf commented on a diff in the pull request:
https://github.com/apache/thrift/pull/1269#discussion_r116491360
— Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
@@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport
- _readFrame();
+ // IMPORTANT: by the time you've got here,
+ // an entire frame is available for reading
return super.read(buffer, offset, length);
}
void _readFrame() {
- _transport.readAll(headerBytes, 0, headerByteCount);
- int size = headerBytes.buffer.asByteData().getUint32(0);
+ if (_body == null)Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }+
+ _readFrameBody();
+ }
+
+ bool _readFrameHeader() {
+ var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;
- if (size < 0) {
+ int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
+ if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
- Uint8List buffer = new Uint8List(size);
- _transport.readAll(buffer, 0, size);
- _setReadBuffer(buffer);
+ _receivedHeaderBytes += got;
+
+ if (_receivedHeaderBytes == headerByteCount)Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }else
{ + _registerForReadableBytes(); + return false; + }+ }
{ + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + }
+
+ void _readFrameBody() {
+ var remainingBodyBytes = _bodySize - _receivedBodyBytes;
+
+ int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes);
+ if (got < 0)+
+ _receivedBodyBytes += got;
+
+ if (_receivedBodyBytes == _bodySize) {
+ var body = _body;
+
+ _bodySize = 0;
+ _body = null;
+ _receivedBodyBytes = 0;
+
+ _setReadBuffer(body);
+
+ var completer = _frameCompleter;
+ _frameCompleter = null;
+ completer.complete(new Object());-
- End diff –
-
`completer` is typed as a `Completer<Uint8List>`, so the Dart analyzer complains about passing `Object` here.
Github user markerickson-wf commented on a diff in the pull request:
https://github.com/apache/thrift/pull/1269#discussion_r116491597
— Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
@@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport
- _readFrame();
+ // IMPORTANT: by the time you've got here,
+ // an entire frame is available for reading
return super.read(buffer, offset, length);
}
void _readFrame() {
- _transport.readAll(headerBytes, 0, headerByteCount);
- int size = headerBytes.buffer.asByteData().getUint32(0);
+ if (_body == null)Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }+
+ _readFrameBody();
+ }
+
+ bool _readFrameHeader() {
+ var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;
- if (size < 0) {
+ int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
+ if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
- Uint8List buffer = new Uint8List(size);
- _transport.readAll(buffer, 0, size);
- _setReadBuffer(buffer);
+ _receivedHeaderBytes += got;
+
+ if (_receivedHeaderBytes == headerByteCount)Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }else
{ + _registerForReadableBytes(); + return false; + }+ }
+
+ void _readFrameBody()Unknown macro: { + var remainingBodyBytes = _bodySize - _receivedBodyBytes; + + int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes); + if (got < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + } + + _receivedBodyBytes += got; + + if (_receivedBodyBytes == _bodySize) { + var body = _body; + + _bodySize = 0; + _body = null; + _receivedBodyBytes = 0; + + _setReadBuffer(body); + + var completer = _frameCompleter; + _frameCompleter = null; + completer.complete(new Object()); + } else { + _registerForReadableBytes(); + } }
Future flush() {
- Uint8List buffer = consumeWriteBuffer();
- int length = buffer.length;
+ if (_frameCompleter == null) {
+ Uint8List buffer = consumeWriteBuffer();
+ int length = buffer.length;
+
+ _headerBytes.buffer.asByteData().setUint32(0, length);
+ _transport.write(_headerBytes, 0, headerByteCount);
+ _transport.write(buffer, 0, length);
+
+ _frameCompleter = new Completer<Object>(); // FIXME: .sync?!-
- End diff –
-
Did you mean to address this `FIXME`?
Github user markerickson-wf commented on the issue:
https://github.com/apache/thrift/pull/1269
There are a couple of related unit tests that are failing in `t_socket_transport_test.dart`. You can run `pub run test test/` from the `lib/dart` directory to see those.
Github user markerickson-wf commented on the issue:
https://github.com/apache/thrift/pull/1269
@allengeorge I think this approach looks pretty good at my first pass. I left a couple of comments, and we'll probably want unit tests around the state transitions to ensure that all paths are covered (setting / unsetting completers, resetting read counters, etc). I'll also ask couple of other devs I know to review (they have done some recent work using framed transports in Dart). Thanks for submitting the PR!
Github user markerickson-wf commented on a diff in the pull request:
https://github.com/apache/thrift/pull/1269#discussion_r116509027
— Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
@@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport
- _readFrame();
+ // IMPORTANT: by the time you've got here,
+ // an entire frame is available for reading
return super.read(buffer, offset, length);
}
void _readFrame() {
- _transport.readAll(headerBytes, 0, headerByteCount);
- int size = headerBytes.buffer.asByteData().getUint32(0);
+ if (_body == null)Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }+
+ _readFrameBody();
+ }
+
+ bool _readFrameHeader() {
+ var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;
- if (size < 0) {
+ int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
+ if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
- Uint8List buffer = new Uint8List(size);
- _transport.readAll(buffer, 0, size);
- _setReadBuffer(buffer);
+ _receivedHeaderBytes += got;
+
+ if (_receivedHeaderBytes == headerByteCount)Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }else {
+ _registerForReadableBytes();-
- End diff –
-
Does this create a synchronous cycle if the header bytes are not available? `_registerForReadableBytes`>`_readFrame`>`readFrameHeader`>`_registerForReadableBytes`>... It seems like this would cause an unresponsive browser?
Github user markerickson-wf commented on a diff in the pull request:
https://github.com/apache/thrift/pull/1269#discussion_r116510886
— Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
@@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport
- _readFrame();
+ // IMPORTANT: by the time you've got here,
+ // an entire frame is available for reading
return super.read(buffer, offset, length);
}
void _readFrame() {
- _transport.readAll(headerBytes, 0, headerByteCount);
- int size = headerBytes.buffer.asByteData().getUint32(0);
+ if (_body == null)Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }+
+ _readFrameBody();
+ }
+
+ bool _readFrameHeader() {
+ var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;
- if (size < 0) {
+ int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
+ if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
- Uint8List buffer = new Uint8List(size);
- _transport.readAll(buffer, 0, size);
- _setReadBuffer(buffer);
+ _receivedHeaderBytes += got;
+
+ if (_receivedHeaderBytes == headerByteCount)Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }else {
+ _registerForReadableBytes();-
- End diff –
-
At first I was reading it as causing a cycle, but `_registerForReadableBytes` will make an async call to `_transport.flush`, so I think this is okay on second thought.
Github user allengeorge commented on a diff in the pull request:
https://github.com/apache/thrift/pull/1269#discussion_r116516987
— Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
@@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport
- _readFrame();
+ // IMPORTANT: by the time you've got here,
+ // an entire frame is available for reading
return super.read(buffer, offset, length);
}
void _readFrame() {
- _transport.readAll(headerBytes, 0, headerByteCount);
- int size = headerBytes.buffer.asByteData().getUint32(0);
+ if (_body == null)Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }+
+ _readFrameBody();
+ }
+
+ bool _readFrameHeader() {
+ var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;
- if (size < 0) {
+ int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
+ if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
- Uint8List buffer = new Uint8List(size);
- _transport.readAll(buffer, 0, size);
- _setReadBuffer(buffer);
+ _receivedHeaderBytes += got;
+
+ if (_receivedHeaderBytes == headerByteCount)Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }else
{ + _registerForReadableBytes(); + return false; + }+ }
{ + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + }
+
+ void _readFrameBody() {
+ var remainingBodyBytes = _bodySize - _receivedBodyBytes;
+
+ int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes);
+ if (got < 0)+
+ _receivedBodyBytes += got;
+
+ if (_receivedBodyBytes == _bodySize) {
+ var body = _body;
+
+ _bodySize = 0;
+ _body = null;
+ _receivedBodyBytes = 0;
+
+ _setReadBuffer(body);
+
+ var completer = _frameCompleter;
+ _frameCompleter = null;
+ completer.complete(new Object());-
- End diff –
-
I was a little worried about returning the frame bytes, but I guess it doesn't matter. Also, I assume that each call to `iterator` returns a completely separate iterator with a view to the underlying buffer.
I'll change it. BTW, it's a little weird that `flush()` returns bytes...
Github user allengeorge commented on a diff in the pull request:
https://github.com/apache/thrift/pull/1269#discussion_r116517213
— Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
@@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport
- _readFrame();
+ // IMPORTANT: by the time you've got here,
+ // an entire frame is available for reading
return super.read(buffer, offset, length);
}
void _readFrame() {
- _transport.readAll(headerBytes, 0, headerByteCount);
- int size = headerBytes.buffer.asByteData().getUint32(0);
+ if (_body == null)Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }+
+ _readFrameBody();
+ }
+
+ bool _readFrameHeader() {
+ var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;
- if (size < 0) {
+ int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
+ if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
- Uint8List buffer = new Uint8List(size);
- _transport.readAll(buffer, 0, size);
- _setReadBuffer(buffer);
+ _receivedHeaderBytes += got;
+
+ if (_receivedHeaderBytes == headerByteCount)Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }else
{ + _registerForReadableBytes(); + return false; + }+ }
+
+ void _readFrameBody()Unknown macro: { + var remainingBodyBytes = _bodySize - _receivedBodyBytes; + + int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes); + if (got < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + } + + _receivedBodyBytes += got; + + if (_receivedBodyBytes == _bodySize) { + var body = _body; + + _bodySize = 0; + _body = null; + _receivedBodyBytes = 0; + + _setReadBuffer(body); + + var completer = _frameCompleter; + _frameCompleter = null; + completer.complete(new Object()); + } else { + _registerForReadableBytes(); + } }
Future flush() {
- Uint8List buffer = consumeWriteBuffer();
- int length = buffer.length;
+ if (_frameCompleter == null) {
+ Uint8List buffer = consumeWriteBuffer();
+ int length = buffer.length;
+
+ _headerBytes.buffer.asByteData().setUint32(0, length);
+ _transport.write(_headerBytes, 0, headerByteCount);
+ _transport.write(buffer, 0, length);
+
+ _frameCompleter = new Completer<Object>(); // FIXME: .sync?!-
- End diff –
-
Ah. Actually I need advice from you as to whether this has to be a `sync` `Completer` or not It's not quite clear to me when one would choose one, and whether that's necessary at this point.
Github user allengeorge commented on a diff in the pull request:
https://github.com/apache/thrift/pull/1269#discussion_r116517412
— Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
@@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport
- _readFrame();
+ // IMPORTANT: by the time you've got here,
+ // an entire frame is available for reading
return super.read(buffer, offset, length);
}
void _readFrame() {
- _transport.readAll(headerBytes, 0, headerByteCount);
- int size = headerBytes.buffer.asByteData().getUint32(0);
+ if (_body == null)Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }+
+ _readFrameBody();
+ }
+
+ bool _readFrameHeader() {
+ var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;
- if (size < 0) {
+ int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
+ if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
- Uint8List buffer = new Uint8List(size);
- _transport.readAll(buffer, 0, size);
- _setReadBuffer(buffer);
+ _receivedHeaderBytes += got;
+
+ if (_receivedHeaderBytes == headerByteCount)Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }else {
+ _registerForReadableBytes();-
- End diff –
-
It shouldn't cause a cycle, but I may have missed something? It's the first time I've done anything with Dart.
Github user allengeorge commented on the issue:
https://github.com/apache/thrift/pull/1269
@markerickson-wf My bad - I should definitely have run the unit tests. I'll check them and fix them up. Also, I'll look into how to create a reasonable set of unit tests for the framed transport.
Github user markerickson-wf commented on a diff in the pull request:
https://github.com/apache/thrift/pull/1269#discussion_r116519094
— Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
@@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport
- _readFrame();
+ // IMPORTANT: by the time you've got here,
+ // an entire frame is available for reading
return super.read(buffer, offset, length);
}
void _readFrame() {
- _transport.readAll(headerBytes, 0, headerByteCount);
- int size = headerBytes.buffer.asByteData().getUint32(0);
+ if (_body == null)Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }+
+ _readFrameBody();
+ }
+
+ bool _readFrameHeader() {
+ var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;
- if (size < 0) {
+ int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
+ if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
- Uint8List buffer = new Uint8List(size);
- _transport.readAll(buffer, 0, size);
- _setReadBuffer(buffer);
+ _receivedHeaderBytes += got;
+
+ if (_receivedHeaderBytes == headerByteCount)Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }else {
+ _registerForReadableBytes();-
- End diff –
-
Yeah, I think it should be okay, sorry for raising the false alarm on my deleted comment.
Github user markerickson-wf commented on a diff in the pull request:
https://github.com/apache/thrift/pull/1269#discussion_r116523611
— Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
@@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport
- _readFrame();
+ // IMPORTANT: by the time you've got here,
+ // an entire frame is available for reading
return super.read(buffer, offset, length);
}
void _readFrame() {
- _transport.readAll(headerBytes, 0, headerByteCount);
- int size = headerBytes.buffer.asByteData().getUint32(0);
+ if (_body == null)Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }+
+ _readFrameBody();
+ }
+
+ bool _readFrameHeader() {
+ var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;
- if (size < 0) {
+ int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
+ if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
- Uint8List buffer = new Uint8List(size);
- _transport.readAll(buffer, 0, size);
- _setReadBuffer(buffer);
+ _receivedHeaderBytes += got;
+
+ if (_receivedHeaderBytes == headerByteCount)Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }else
{ + _registerForReadableBytes(); + return false; + }+ }
{ + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + }
+
+ void _readFrameBody() {
+ var remainingBodyBytes = _bodySize - _receivedBodyBytes;
+
+ int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes);
+ if (got < 0)+
+ _receivedBodyBytes += got;
+
+ if (_receivedBodyBytes == _bodySize) {
+ var body = _body;
+
+ _bodySize = 0;
+ _body = null;
+ _receivedBodyBytes = 0;
+
+ _setReadBuffer(body);
+
+ var completer = _frameCompleter;
+ _frameCompleter = null;
+ completer.complete(new Object());-
- End diff –
-
Yeah, I don't think you need to type `Completer` at all, and you can just `complete()`. In this case the Future is just a signal of completion, I agree we don't need to return the bytes.
Github user markerickson-wf commented on a diff in the pull request:
https://github.com/apache/thrift/pull/1269#discussion_r116525084
— Diff: lib/dart/lib/src/transport/t_framed_transport.dart —
@@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport
- _readFrame();
+ // IMPORTANT: by the time you've got here,
+ // an entire frame is available for reading
return super.read(buffer, offset, length);
}
void _readFrame() {
- _transport.readAll(headerBytes, 0, headerByteCount);
- int size = headerBytes.buffer.asByteData().getUint32(0);
+ if (_body == null)Unknown macro: { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + }+
+ _readFrameBody();
+ }
+
+ bool _readFrameHeader() {
+ var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;
- if (size < 0) {
+ int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes);
+ if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); }
- Uint8List buffer = new Uint8List(size);
- _transport.readAll(buffer, 0, size);
- _setReadBuffer(buffer);
+ _receivedHeaderBytes += got;
+
+ if (_receivedHeaderBytes == headerByteCount)Unknown macro: { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + }else
{ + _registerForReadableBytes(); + return false; + }+ }
+
+ void _readFrameBody()Unknown macro: { + var remainingBodyBytes = _bodySize - _receivedBodyBytes; + + int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes); + if (got < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + } + + _receivedBodyBytes += got; + + if (_receivedBodyBytes == _bodySize) { + var body = _body; + + _bodySize = 0; + _body = null; + _receivedBodyBytes = 0; + + _setReadBuffer(body); + + var completer = _frameCompleter; + _frameCompleter = null; + completer.complete(new Object()); + } else { + _registerForReadableBytes(); + } }
Future flush() {
- Uint8List buffer = consumeWriteBuffer();
- int length = buffer.length;
+ if (_frameCompleter == null) {
+ Uint8List buffer = consumeWriteBuffer();
+ int length = buffer.length;
+
+ _headerBytes.buffer.asByteData().setUint32(0, length);
+ _transport.write(_headerBytes, 0, headerByteCount);
+ _transport.write(buffer, 0, length);
+
+ _frameCompleter = new Completer<Object>(); // FIXME: .sync?!-
- End diff –
-
https://api.dartlang.org/stable/1.23.0/dart-async/Completer/Completer.sync.html
Despite the warning above, I used `sync` [in t_http_transport](https://github.com/apache/thrift/blob/master/lib/dart/lib/src/transport/t_http_transport.dart#L52). If I recall correctly, this is a symptom of adapting the sync API to an async system. If two subsequent messages arrive on the socket, the second could stomp on the first before the listener has a chance to react. i.e. `flush`>`complete message 1`>`complete message 2`>`read`>`read`. By using a sync completer, I think the first listener should be able to read before the second message is completed.
Github user jeking3 commented on the issue:
https://github.com/apache/thrift/pull/1269
This pull request needs to be completed. nudge
Github user jeking3 commented on the issue:
https://github.com/apache/thrift/pull/1269
@allengeorge if you add framed transport in dart to the cross test, that would test a lot... but it would not test error conditions such as this one. Just a reminder, this needs to move forward.
Github user allengeorge commented on the issue:
https://github.com/apache/thrift/pull/1269
Hi James - I’m really sorry for the delay; slammed at work. I’ll work on it this weekend.
Again, super sorry about this
________________________________
From: James E. King, III <notifications@github.com>
Sent: Wednesday, October 25, 2017 10:44:19 AM
To: apache/thrift
Cc: Allen George; Mention
Subject: Re: [apache/thrift] THRIFT-4187 Allow dart framed transport to read incomplete frame (#1269)
@allengeorge<https://github.com/allengeorge> if you add framed transport in dart to the cross test, that would test a lot... but it would not test error conditions such as this one. Just a reminder, this needs to move forward.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub<https://github.com/apache/thrift/pull/1269#issuecomment-339353877>, or mute the thread<https://github.com/notifications/unsubscribe-auth/AAEO9HLOiegU2gF0E4zeUS7XjKsp5ZJlks5sv0lDgaJpZM4NaaLl>.
Github user jeking3 commented on the issue:
https://github.com/apache/thrift/pull/1269
Don't worry, I'm just doing my usual "push things forward" bit. I got the backlog down to 52 open PRs, but that's still about 45 too many for my taste.
Just a ping that I'm restarting work on this now. Will re-learn Dart and try write some tests
Github user jeking3 commented on the issue:
https://github.com/apache/thrift/pull/1269
This needs to be rebased and completed.
Github user jeking3 commented on the issue:
https://github.com/apache/thrift/pull/1269
This needs to be rebased and completed.
Github user allengeorge commented on the issue:
https://github.com/apache/thrift/pull/1269
Yup. Will do. I'm a gonna set myself a target to get it done by end of next week. I've been lax on this :/
Github user allengeorge commented on the issue:
https://github.com/apache/thrift/pull/1269
@jeking3 Fixed up the failing tests. Confirming that precross works.
Github user allengeorge commented on the issue:
https://github.com/apache/thrift/pull/1269
@jeking3 I've added unit tests around this behavior and enabled more cross tests! Do you want me to roll that into this PR, or should I create another bug and PR?
Github user allengeorge commented on the issue:
https://github.com/apache/thrift/pull/1269
@jeking3 Any chance this could be merged? TY!
Github user jeking3 commented on the issue:
https://github.com/apache/thrift/pull/1269
I'd suggest waiting to see how the CI build fares given a change was made. So today is a distinct possibility. Also, you can commit your own changes. There's no rule against it.
Github user allengeorge commented on the issue:
https://github.com/apache/thrift/pull/1269
No problem! I'm sorry I was lax and left it so long :/
Looks like the tests passed, so it's good to merge. Not 100% of the process, so I'll follow up via email.
Github user jeking3 commented on the issue:
https://github.com/apache/thrift/pull/1269
Remember to resolve https://issues.apache.org/jira/browse/THRIFT-4187 as fixed in 0.12.0.
Github user allengeorge commented on the issue:
https://github.com/apache/thrift/pull/1269
@jeking3 Yup - just did that. Is there a way to do it automatically, or is it a manual process (for now)?
So, AFAICT, this happens because the Dart client can't handle framed writes that come in two pieces.
The Rust server is doing the following:
In this situation it seems like the dart client can't read a complete frame. Changing the rust server to do the write in one piece (i.e. header and body at the same time) seems to avoid the failure.