Details

    • Type: Sub-task Sub-task
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: v1.4.0
    • Component/s: None
    • Labels:
      None
    1. FLUME-1896.patch
      120 kB
      Hari Shreedharan
    2. FLUME-1896-1.patch
      229 kB
      Hari Shreedharan
    3. FLUME-1896-2.patch
      219 kB
      Hari Shreedharan
    4. FLUME-1896-3.patch
      219 kB
      Hari Shreedharan
    5. FLUME-1896-4.patch
      222 kB
      Hari Shreedharan
    6. FLUME-1896-5.patch
      222 kB
      Hari Shreedharan
    7. FLUME-1896-6.patch
      223 kB
      Hari Shreedharan

      Issue Links

        Activity

        Hide
        Hudson added a comment -

        Integrated in flume-trunk #360 (See https://builds.apache.org/job/flume-trunk/360/)
        FLUME-1896: Implement Thrift RpcClient (Revision 60da3d8606415202f966017a18084cb59d3e64d1)

        Result = FAILURE
        brock : http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=60da3d8606415202f966017a18084cb59d3e64d1
        Files :

        • flume-ng-legacy-sources/flume-thrift-source/pom.xml
        • flume-ng-sdk/src/main/thrift/flume.thrift
        • flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
        • flume-ng-legacy-sources/flume-thrift-source/src/main/thrift/aslv2
        • flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java
        • flume-ng-sdk/src/main/java/org/apache/flume/thrift/ThriftSourceProtocol.java
        • flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEventServer.java
        • flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ResultCode.java
        • flume-ng-sources/flume-scribe-source/src/main/thrift/aslv2
        • flume-ng-sources/flume-scribe-source/src/main/thrift/scribe-source.thrift
        • flume-ng-sdk/src/main/java/org/apache/flume/thrift/ThriftFlumeEvent.java
        • flume-ng-sources/flume-scribe-source/pom.xml
        • flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
        • flume-ng-sdk/src/main/thrift/aslv2
        • flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
        • flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java
        • flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java
        • flume-ng-sdk/pom.xml
        • flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java
        • flume-ng-sdk/src/main/java/org/apache/flume/thrift/Status.java
        • pom.xml
        • flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/LogEntry.java
        • flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java
        • flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java
        • flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
        • flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/Scribe.java
        Show
        Hudson added a comment - Integrated in flume-trunk #360 (See https://builds.apache.org/job/flume-trunk/360/ ) FLUME-1896 : Implement Thrift RpcClient (Revision 60da3d8606415202f966017a18084cb59d3e64d1) Result = FAILURE brock : http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=60da3d8606415202f966017a18084cb59d3e64d1 Files : flume-ng-legacy-sources/flume-thrift-source/pom.xml flume-ng-sdk/src/main/thrift/flume.thrift flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java flume-ng-legacy-sources/flume-thrift-source/src/main/thrift/aslv2 flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/Priority.java flume-ng-sdk/src/main/java/org/apache/flume/thrift/ThriftSourceProtocol.java flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEventServer.java flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ResultCode.java flume-ng-sources/flume-scribe-source/src/main/thrift/aslv2 flume-ng-sources/flume-scribe-source/src/main/thrift/scribe-source.thrift flume-ng-sdk/src/main/java/org/apache/flume/thrift/ThriftFlumeEvent.java flume-ng-sources/flume-scribe-source/pom.xml flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java flume-ng-sdk/src/main/thrift/aslv2 flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java flume-ng-sdk/pom.xml flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java flume-ng-sdk/src/main/java/org/apache/flume/thrift/Status.java pom.xml flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/LogEntry.java flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/EventStatus.java flume-ng-legacy-sources/flume-thrift-source/src/main/java/com/cloudera/flume/handlers/thrift/ThriftFlumeEvent.java flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/Scribe.java
        Hide
        Brock Noland added a comment -

        Committed to trunk and 1.4. Thanks Hari!

        Show
        Brock Noland added a comment - Committed to trunk and 1.4. Thanks Hari!
        Hide
        Mike Percy added a comment -

        Ahh, yeah they explicitly forbid out-of-order responses, too bad. Nice work.

        Show
        Mike Percy added a comment - Ahh, yeah they explicitly forbid out-of-order responses, too bad. Nice work.
        Hide
        Hari Shreedharan added a comment -

        Sorry, removed old patch and posted new one.

        Show
        Hari Shreedharan added a comment - Sorry, removed old patch and posted new one.
        Hide
        Hari Shreedharan added a comment -

        The sendBase and receiveBase methods as of Thrift 0.9.0:

         protected void sendBase(String methodName, TBase args) throws TException {
            oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));
            args.write(oprot_);
            oprot_.writeMessageEnd();
            oprot_.getTransport().flush();
          }
        
          protected void receiveBase(TBase result, String methodName) throws TException {
            TMessage msg = iprot_.readMessageBegin();
            if (msg.type == TMessageType.EXCEPTION) {
              TApplicationException x = TApplicationException.read(iprot_);
              iprot_.readMessageEnd();
              throw x;
            }
            if (msg.seqid != seqid_) {
              throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
            }
            result.read(iprot_);
            iprot_.readMessageEnd();
          }
        

        Notice how the sequence ids are handled.

        The readMessageBegin causes the thread to block until a message actually is received - which causes the client.appendBatch etc to be blocking (they call receiveBase after sending).

        Show
        Hari Shreedharan added a comment - The sendBase and receiveBase methods as of Thrift 0.9.0: protected void sendBase( String methodName, TBase args) throws TException { oprot_.writeMessageBegin( new TMessage(methodName, TMessageType.CALL, ++seqid_)); args.write(oprot_); oprot_.writeMessageEnd(); oprot_.getTransport().flush(); } protected void receiveBase(TBase result, String methodName) throws TException { TMessage msg = iprot_.readMessageBegin(); if (msg.type == TMessageType.EXCEPTION) { TApplicationException x = TApplicationException.read(iprot_); iprot_.readMessageEnd(); throw x; } if (msg.seqid != seqid_) { throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response" ); } result.read(iprot_); iprot_.readMessageEnd(); } Notice how the sequence ids are handled. The readMessageBegin causes the thread to block until a message actually is received - which causes the client.appendBatch etc to be blocking (they call receiveBase after sending).
        Hide
        Hari Shreedharan added a comment -

        The underlying blocking calls - the append and appendBatch calls in the Thrift client, call the sendBase method in thrift which increments the seq id of the message. If the next response received is not the same sequence id, then an IllegalStateException is thrown. Basically after a message is sent, the response received must be of the same sequence id. So we need to make our methods synchronized/locked to make sure a different thread does not call append/appendBatch methods.

        I think the implementation in the current patch is reasonably clean, and is a standard implementation technique used in distributed systems.

        Show
        Hari Shreedharan added a comment - The underlying blocking calls - the append and appendBatch calls in the Thrift client, call the sendBase method in thrift which increments the seq id of the message. If the next response received is not the same sequence id, then an IllegalStateException is thrown. Basically after a message is sent, the response received must be of the same sequence id. So we need to make our methods synchronized/locked to make sure a different thread does not call append/appendBatch methods. I think the implementation in the current patch is reasonably clean, and is a standard implementation technique used in distributed systems.
        Hide
        Mike Percy added a comment -

        If you did not do anything, i.e. no locks, no threads locals and no client pool, what happens when multiple clients try to use the same handle?

        Show
        Mike Percy added a comment - If you did not do anything, i.e. no locks, no threads locals and no client pool, what happens when multiple clients try to use the same handle?
        Hide
        Hari Shreedharan added a comment -

        I actually implemented #2. I think it is important to keep the semantics the same across all Rpc clients, and the second option does that.

        Show
        Hari Shreedharan added a comment - I actually implemented #2. I think it is important to keep the semantics the same across all Rpc clients, and the second option does that.
        Hide
        Mike Percy added a comment -

        Hari, I think option #1 is the most straightforward. We just need to document that the Thrift RPC client only supports one client per connection.

        I'm taking a look at your patch now too.

        Show
        Mike Percy added a comment - Hari, I think option #1 is the most straightforward. We just need to document that the Thrift RPC client only supports one client per connection. I'm taking a look at your patch now too.
        Hide
        Hari Shreedharan added a comment -

        Some extra methods added for Thrift Sink testing. RB would not allow me to modify files not in the repo, so adding these changes into this patch.

        Show
        Hari Shreedharan added a comment - Some extra methods added for Thrift Sink testing. RB would not allow me to modify files not in the repo, so adding these changes into this patch.
        Hide
        Hari Shreedharan added a comment -

        Latest patch from rb

        Show
        Hari Shreedharan added a comment - Latest patch from rb
        Hide
        Hari Shreedharan added a comment -

        Updating the Rpc client to use TFastFramedTransport and TCompactProtocol

        Show
        Hari Shreedharan added a comment - Updating the Rpc client to use TFastFramedTransport and TCompactProtocol
        Hide
        Hari Shreedharan added a comment -

        Added factory methods as it was interfering with testing the Thrift Source

        Show
        Hari Shreedharan added a comment - Added factory methods as it was interfering with testing the Thrift Source
        Hide
        Hari Shreedharan added a comment -

        I filed FLUME-1903 to add methods in RpcClientFactory for thrift rpc client.

        Show
        Hari Shreedharan added a comment - I filed FLUME-1903 to add methods in RpcClientFactory for thrift rpc client.
        Hide
        Hari Shreedharan added a comment -

        I have verified that Thrift 0.6.1 is wire compatible with the Thrift legacy source after upgrading to 0.9 by writing an app that uses thrift 0.6.1 to write flume og style events to the thrift legacy source running thrift 0.9.0 - so we can go ahead with this upgrade.

        Show
        Hari Shreedharan added a comment - I have verified that Thrift 0.6.1 is wire compatible with the Thrift legacy source after upgrading to 0.9 by writing an app that uses thrift 0.6.1 to write flume og style events to the thrift legacy source running thrift 0.9.0 - so we can go ahead with this upgrade.
        Hide
        Hari Shreedharan added a comment -

        Tentatively upgrading Thrift to 0.9.0. I am yet to verify that 0.6.1 is wire-compatible with 0.9.9.

        Added thread pooling to the thrift rpc client. Added a unit test to test thread safety.

        Show
        Hari Shreedharan added a comment - Tentatively upgrading Thrift to 0.9.0. I am yet to verify that 0.6.1 is wire-compatible with 0.9.9. Added thread pooling to the thrift rpc client. Added a unit test to test thread safety.
        Hide
        Hari Shreedharan added a comment -

        So, I see 3 ways around this:

        1. Make the process/processBatch methods synchronized/protected by locks, so only one thread can call these at any point in time - basically enforcing the user code to manage its own threads - using a pool of threads with a pool of RpcClients etc - this differs from how Avro RPC works currently.
        2. Manage our own clients. Have a list of ThriftClients inside each Rpc Client (perhaps, the number can be defined by a configuration - maxConnections or something). "Checkout" a thrift client and wait for it to complete and "check in it." The only question is the semantics when all connections are checked out. This allows us to limit the connections. This can potentially have threading issues.
        3. Have a thread local thrift client in the Rpc Client. This is probably the safest from a threading perspective, but can lead to a high number of open connections, if the user code uses too many threads - perhaps documenting this is good enough?

        Thoughts?

        Show
        Hari Shreedharan added a comment - So, I see 3 ways around this: 1. Make the process/processBatch methods synchronized/protected by locks, so only one thread can call these at any point in time - basically enforcing the user code to manage its own threads - using a pool of threads with a pool of RpcClients etc - this differs from how Avro RPC works currently. 2. Manage our own clients. Have a list of ThriftClients inside each Rpc Client (perhaps, the number can be defined by a configuration - maxConnections or something). "Checkout" a thrift client and wait for it to complete and "check in it." The only question is the semantics when all connections are checked out. This allows us to limit the connections. This can potentially have threading issues. 3. Have a thread local thrift client in the Rpc Client. This is probably the safest from a threading perspective, but can lead to a high number of open connections, if the user code uses too many threads - perhaps documenting this is good enough? Thoughts?
        Hide
        Hari Shreedharan added a comment -

        Looks like even the async client is not really thread safe. I think I will need to make the client thread local so we can use the same rpc client from multiple threads (like the avro rpc client). Will do that and update the patch.

        Show
        Hari Shreedharan added a comment - Looks like even the async client is not really thread safe. I think I will need to make the client thread local so we can use the same rpc client from multiple threads (like the avro rpc client). Will do that and update the patch.
        Hide
        Hari Shreedharan added a comment -

        Minor updates

        Show
        Hari Shreedharan added a comment - Minor updates
        Hide
        Hari Shreedharan added a comment -

        Correct patch attached.

        Show
        Hari Shreedharan added a comment - Correct patch attached.
        Hide
        Hari Shreedharan added a comment -

        Initial patch, feedback requested.

        Show
        Hari Shreedharan added a comment - Initial patch, feedback requested.
        Hide
        Hari Shreedharan added a comment -

        Initial implementation of Thrift RPC client. I will add some comments on why the exception handling is done by catching and sending the errors as part of the result. I had to change some of the pom.xml files which generate thrift java files, because the mvn-thrift plugin is no longer on maven. So, changed the generation code to the one I used. Generating the code requires thrift 0.6.1 - since there is no .thrift file for scribe sources, and thrift generated code is backwards incompatible between 0.9 and 0.6.1.

        Show
        Hari Shreedharan added a comment - Initial implementation of Thrift RPC client. I will add some comments on why the exception handling is done by catching and sending the errors as part of the result. I had to change some of the pom.xml files which generate thrift java files, because the mvn-thrift plugin is no longer on maven. So, changed the generation code to the one I used. Generating the code requires thrift 0.6.1 - since there is no .thrift file for scribe sources, and thrift generated code is backwards incompatible between 0.9 and 0.6.1.

          People

          • Assignee:
            Hari Shreedharan
            Reporter:
            Hari Shreedharan
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development