Flume
  1. Flume
  2. FLUME-1244

Implement a load-balancing RpcClient with round/robin and random distribution capabilties.

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: v1.2.0
    • Component/s: None
    • Labels:
      None

      Description

      We now have a load balancing sink processor implemented via FLUME-1198 which provides the ability to distribute load over a set of sinks via round/robin or random distribution scheme. A similar implementation should be available for the RpcClient within the client SDK.

      1. FLUME-1244-1.patch
        47 kB
        Arvind Prabhakar
      2. FLUME-1244-2.patch
        47 kB
        Arvind Prabhakar

        Activity

        Hide
        Arvind Prabhakar added a comment -

        Here is my attempt at Load Balancing RPC client. Some details:

        • To activate the client, set the property client.type to default_loadbalance.
        • It requires that you have at least two hosts specified via hosts = h1 h2 and hosts.h1 = hostname:port etc.
        • By default it uses a round-robin scheme to distribute load over the various hosts.
        • To change the load distribution scheme to random instead of round robin, set the value of host-selector to random. Otherwise it can be set to round_robin.
        • To specify your own implementation of host selection algorithm, implement the interface org.apache.flume.api.LoadBalancingRpcClient.HostSelector.
        • This implementation uses optimistic failover where a failure in handling event send request will result in the client automatically advancing to the next available host specified by the selector.
        • If all hosts fail, the EventDeliveryException will be raised.
        Show
        Arvind Prabhakar added a comment - Here is my attempt at Load Balancing RPC client. Some details: To activate the client, set the property client.type to default_loadbalance . It requires that you have at least two hosts specified via hosts = h1 h2 and hosts.h1 = hostname:port etc. By default it uses a round-robin scheme to distribute load over the various hosts. To change the load distribution scheme to random instead of round robin, set the value of host-selector to random . Otherwise it can be set to round_robin . To specify your own implementation of host selection algorithm, implement the interface org.apache.flume.api.LoadBalancingRpcClient.HostSelector . This implementation uses optimistic failover where a failure in handling event send request will result in the client automatically advancing to the next available host specified by the selector. If all hosts fail, the EventDeliveryException will be raised.
        Hide
        Mike Percy added a comment -

        Reviewing this patch now.

        Show
        Mike Percy added a comment - Reviewing this patch now.
        Hide
        Mike Percy added a comment -

        Nice work. This functionality is sorely needed!

        I have the following code review feedback:

        LoadBalancingRpcClient.java
        ===========================

        In getClient(): if (!client.isActive()), then we should close() the RpcClient before replacing the reference. That call cleans up the Netty transceiver threads in the base client. Also, (!client.isActive()) should be an "else if" condition, otherwise I think it's possible to create two clients in a row very quickly in a failure case when the client starts as null and remains inactive/dead after creation due to the host being down.

        In append(): I wonder how expensive it is to create a new HostInfo Iterator on every append() operation. A higher level caching construct or iteration abstraction might be more efficient. However the RoundRobinHostSelector implementation is nice and elegant. This is just a note and I think we can profile / optimize this later if needed.

        In close(): This method should call close() on all iterated RPC clients. !isActive() does not imply that close() does not need to be called... there are cases where a NettyRpcClient can be NEW or DEAD and have an active thread pool. Also, close() is specified as an idempotent operation, but maybe that needs to be clarified better in the RpcClient interface.

        In RandomOrderHostSelector.createHostIterator(): This line is a bit disconcerting: indexOrder[indexList.size() - 1] = indexList.remove(pick);
        It would would be good to store indexList.size() in a local var so we don't have to look in the JLS to figure out the order of operations on that... ... BTW, wouldn't the RHS be evaluated first? Or inside the square brackets would happen first?

        TestLoadBalancingRpcClient.java
        ===============================

        Nit: the test method names should start with a lowercase letter for Java standard method naming

        Minor: in TestLbClientTenHostRoundRobinDistribution() and TestLbClientTenHostRoundRobinDistributionBatch(): We do not have to rely on distribution approximation for the asserts here. Since there is no randomness, we can count the # of events and assert exactly as in some of the other non-randomized tests.

        That's all I have. Thanks for adding this important feature, all in all it looks quite good.

        Show
        Mike Percy added a comment - Nice work. This functionality is sorely needed! I have the following code review feedback: LoadBalancingRpcClient.java =========================== In getClient(): if (!client.isActive()), then we should close() the RpcClient before replacing the reference. That call cleans up the Netty transceiver threads in the base client. Also, (!client.isActive()) should be an "else if" condition, otherwise I think it's possible to create two clients in a row very quickly in a failure case when the client starts as null and remains inactive/dead after creation due to the host being down. In append(): I wonder how expensive it is to create a new HostInfo Iterator on every append() operation. A higher level caching construct or iteration abstraction might be more efficient. However the RoundRobinHostSelector implementation is nice and elegant. This is just a note and I think we can profile / optimize this later if needed. In close(): This method should call close() on all iterated RPC clients. !isActive() does not imply that close() does not need to be called... there are cases where a NettyRpcClient can be NEW or DEAD and have an active thread pool. Also, close() is specified as an idempotent operation, but maybe that needs to be clarified better in the RpcClient interface. In RandomOrderHostSelector.createHostIterator(): This line is a bit disconcerting: indexOrder [indexList.size() - 1] = indexList.remove(pick); It would would be good to store indexList.size() in a local var so we don't have to look in the JLS to figure out the order of operations on that... ... BTW, wouldn't the RHS be evaluated first? Or inside the square brackets would happen first? TestLoadBalancingRpcClient.java =============================== Nit: the test method names should start with a lowercase letter for Java standard method naming Minor: in TestLbClientTenHostRoundRobinDistribution() and TestLbClientTenHostRoundRobinDistributionBatch(): We do not have to rely on distribution approximation for the asserts here. Since there is no randomness, we can count the # of events and assert exactly as in some of the other non-randomized tests. That's all I have. Thanks for adding this important feature, all in all it looks quite good.
        Hide
        Arvind Prabhakar added a comment -

        Thanks for the review Mike! Great feedback as usual. See my comments below:

        In getClient(): if (!client.isActive()), then we should close() the RpcClient before replacing the reference. That call cleans up the Netty transceiver threads in the base client. Also, (!client.isActive()) should be an "else if" condition, otherwise I think it's possible to create two clients in a row very quickly in a failure case when the client starts as null and remains inactive/dead after creation due to the host being down.

        Done.

        In append(): I wonder how expensive it is to create a new HostInfo Iterator on every append() operation. A higher level caching construct or iteration abstraction might be more efficient. However the RoundRobinHostSelector implementation is nice and elegant. This is just a note and I think we can profile / optimize this later if needed.

        Agree. Lets profile it and see. The thought did cross my mind as well.

        In close(): This method should call close() on all iterated RPC clients. !isActive() does not imply that close() does not need to be called... there are cases where a NettyRpcClient can be NEW or DEAD and have an active thread pool. Also, close() is specified as an idempotent operation, but maybe that needs to be clarified better in the RpcClient interface.

        Done.

        In RandomOrderHostSelector.createHostIterator(): This line is a bit disconcerting: indexOrder[indexList.size() - 1] = indexList.remove(pick);

        One of my finer moments.

        It would would be good to store indexList.size() in a local var so we don't have to look in the JLS to figure out the order of operations on that... ... BTW, wouldn't the RHS be evaluated first? Or inside the square brackets would happen first?

        I refactored it to make it easy on the eyes. Apologies for having cryptic code.

        Nit: the test method names should start with a lowercase letter for Java standard method naming

        Another one of my finer moments Fixed it.

        Minor: in TestLbClientTenHostRoundRobinDistribution() and TestLbClientTenHostRoundRobinDistributionBatch(): We do not have to rely on distribution approximation for the asserts here. Since there is no randomness, we can count the # of events and assert exactly as in some of the other non-randomized tests.

        I will let it be right now since at one time I was playing with different values of the host and event size and there was some offsets due to rounding that made this comparison better. But I agree - in the current state it does not make much difference.

        That's all I have. Thanks for adding this important feature, all in all it looks quite good.

        Thanks again for a thorough review Mike.

        Show
        Arvind Prabhakar added a comment - Thanks for the review Mike! Great feedback as usual. See my comments below: In getClient(): if (!client.isActive()), then we should close() the RpcClient before replacing the reference. That call cleans up the Netty transceiver threads in the base client. Also, (!client.isActive()) should be an "else if" condition, otherwise I think it's possible to create two clients in a row very quickly in a failure case when the client starts as null and remains inactive/dead after creation due to the host being down. Done. In append(): I wonder how expensive it is to create a new HostInfo Iterator on every append() operation. A higher level caching construct or iteration abstraction might be more efficient. However the RoundRobinHostSelector implementation is nice and elegant. This is just a note and I think we can profile / optimize this later if needed. Agree. Lets profile it and see. The thought did cross my mind as well. In close(): This method should call close() on all iterated RPC clients. !isActive() does not imply that close() does not need to be called... there are cases where a NettyRpcClient can be NEW or DEAD and have an active thread pool. Also, close() is specified as an idempotent operation, but maybe that needs to be clarified better in the RpcClient interface. Done. In RandomOrderHostSelector.createHostIterator(): This line is a bit disconcerting: indexOrder [indexList.size() - 1] = indexList.remove(pick); One of my finer moments. It would would be good to store indexList.size() in a local var so we don't have to look in the JLS to figure out the order of operations on that... ... BTW, wouldn't the RHS be evaluated first? Or inside the square brackets would happen first? I refactored it to make it easy on the eyes. Apologies for having cryptic code. Nit: the test method names should start with a lowercase letter for Java standard method naming Another one of my finer moments Fixed it. Minor: in TestLbClientTenHostRoundRobinDistribution() and TestLbClientTenHostRoundRobinDistributionBatch(): We do not have to rely on distribution approximation for the asserts here. Since there is no randomness, we can count the # of events and assert exactly as in some of the other non-randomized tests. I will let it be right now since at one time I was playing with different values of the host and event size and there was some offsets due to rounding that made this comparison better. But I agree - in the current state it does not make much difference. That's all I have. Thanks for adding this important feature, all in all it looks quite good. Thanks again for a thorough review Mike.
        Hide
        Arvind Prabhakar added a comment -

        Patch with review feedback incorporated.

        Show
        Arvind Prabhakar added a comment - Patch with review feedback incorporated.
        Hide
        Mike Percy added a comment -

        +1

        Show
        Mike Percy added a comment - +1
        Hide
        Mike Percy added a comment -

        Patch committed. Thanks Arvind!

        Show
        Mike Percy added a comment - Patch committed. Thanks Arvind!
        Hide
        Mubarak Seyed added a comment -

        Thanks Arvind/Mike for the Patch. Will test it out.

        Show
        Mubarak Seyed added a comment - Thanks Arvind/Mike for the Patch. Will test it out.
        Hide
        Hudson added a comment -

        Integrated in flume-trunk #228 (See https://builds.apache.org/job/flume-trunk/228/)
        FLUME-1244. Implement a load-balancing RpcClient.

        (Arvind Prabhakar via Mike Percy) (Revision 1347827)

        Result = SUCCESS
        mpercy : http://svn.apache.org/viewvc/?view=rev&rev=1347827
        Files :

        • /incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
        • /incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java
        • /incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
        • /incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java
        • /incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java
        • /incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
        • /incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
        • /incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
        • /incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
        • /incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/util
        • /incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/util/SpecificOrderIterator.java
        • /incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
        • /incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java
        • /incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
        Show
        Hudson added a comment - Integrated in flume-trunk #228 (See https://builds.apache.org/job/flume-trunk/228/ ) FLUME-1244 . Implement a load-balancing RpcClient. (Arvind Prabhakar via Mike Percy) (Revision 1347827) Result = SUCCESS mpercy : http://svn.apache.org/viewvc/?view=rev&rev=1347827 Files : /incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java /incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java /incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java /incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/FailoverRpcClient.java /incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/HostInfo.java /incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java /incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java /incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java /incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java /incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/util /incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/util/SpecificOrderIterator.java /incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java /incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java /incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java

          People

          • Assignee:
            Arvind Prabhakar
            Reporter:
            Arvind Prabhakar
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development