Kafka
  1. Kafka
  2. KAFKA-257

Hadoop producer should use software load balancer

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.7
    • Fix Version/s: 0.7.1
    • Component/s: contrib
    • Labels:
      None

      Description

      Improvements to Kafka's Hadoop producer:

      • Uses new Producer API (as opposed to SyncProducer)
      • Supports software load balancer (Kafka URI for this is kafka+zk://<zk-path>#<topic-name>)
      • Can set compression codec (default is 0 or no compression)

        Activity

        Sam Shah created issue -
        Sam Shah made changes -
        Field Original Value New Value
        Status Open [ 1 ] Patch Available [ 10002 ]
        Sam Shah made changes -
        Attachment kafka-257.patch [ 12511946 ]
        Hide
        Sam Shah added a comment -

        This has been tested and running in production at LinkedIn for one month.

        Show
        Sam Shah added a comment - This has been tested and running in production at LinkedIn for one month.
        Hide
        Neha Narkhede added a comment -

        Thanks for the patch Sam ! Overall, it looks great. I have a few comments/questions -

        1. In the example README, it will be good to change " REGISTER zookeeper-3.3.3.jar;" to " REGISTER zookeeper-3.3.4.jar;". The reason being that there are known critical bugs with 3.3.3 and it will save users who might copy paste the example script as is

        2. As a default option, is there any particular reason for using the sync producer ? Using the async producer option provides significant improvement in the network bandwidth utilization.

        I have a few more questions/comments about the Hadoop-Kafka bridge, feel free to file another JIRA if you'd rather have it fixed later.

        1. It seems like when the broker.list option is selected, only one broker can be specified. This is true if that broker is pointing to a VIP/hardware load balancer, but if not, then the broker.list is a csv of broker_id:broker_host:broker_port. It will be good to support that here.

        2. If the kafka.output.producer.type=async, there are a few config options that should be supported. They are listed in AsyncProducerConfigShared

        3. Does it make sense to also let the user specify a custom Partitioner as part of the partitioner.class config ? If one is not specified, it defaults to kafka.producer.DefaultPartitioner.

        Show
        Neha Narkhede added a comment - Thanks for the patch Sam ! Overall, it looks great. I have a few comments/questions - 1. In the example README, it will be good to change " REGISTER zookeeper-3.3.3.jar;" to " REGISTER zookeeper-3.3.4.jar;". The reason being that there are known critical bugs with 3.3.3 and it will save users who might copy paste the example script as is 2. As a default option, is there any particular reason for using the sync producer ? Using the async producer option provides significant improvement in the network bandwidth utilization. I have a few more questions/comments about the Hadoop-Kafka bridge, feel free to file another JIRA if you'd rather have it fixed later. 1. It seems like when the broker.list option is selected, only one broker can be specified. This is true if that broker is pointing to a VIP/hardware load balancer, but if not, then the broker.list is a csv of broker_id:broker_host:broker_port. It will be good to support that here. 2. If the kafka.output.producer.type=async, there are a few config options that should be supported. They are listed in AsyncProducerConfigShared 3. Does it make sense to also let the user specify a custom Partitioner as part of the partitioner.class config ? If one is not specified, it defaults to kafka.producer.DefaultPartitioner.
        Sam Shah made changes -
        Attachment kafka-257.patch [ 12511946 ]
        Sam Shah made changes -
        Attachment kafka-257.patch [ 12512068 ]
        Hide
        Sam Shah added a comment -

        Thanks Neha. Answers:

        1. Done. See updated patch.
        2. The KafkaRecordWriter queues up messages up to a configurable amount (default 10MB) so it is amortizing network bandwidth. It doesn't make sense to start up an async background thread, as the task will have to block anyway to push data (it's also not a good idea from a node utilization perspective.)

        1. Yup, that's a good point, the old hadoop producer only supported one broker in its URI and I didn't fix it. The updated patch allows multiple brokers separated by commas.
        2. Async shouldn't be used (see point #2 above), which is why I didn't add support for the other options.
        3. I haven't come across a use case yet for a custom partitioner. I can add support later; it should be easy.

        Show
        Sam Shah added a comment - Thanks Neha. Answers: 1. Done. See updated patch. 2. The KafkaRecordWriter queues up messages up to a configurable amount (default 10MB) so it is amortizing network bandwidth. It doesn't make sense to start up an async background thread, as the task will have to block anyway to push data (it's also not a good idea from a node utilization perspective.) 1. Yup, that's a good point, the old hadoop producer only supported one broker in its URI and I didn't fix it. The updated patch allows multiple brokers separated by commas. 2. Async shouldn't be used (see point #2 above), which is why I didn't add support for the other options. 3. I haven't come across a use case yet for a custom partitioner. I can add support later; it should be easy.
        Hide
        Neha Narkhede added a comment -

        Thanks for the updated patch Sam.

        1. The broker.list parameter takes in a csv list of "broker_id:broker_host:broker_port". The broker id of each server in this connection URL must be unique. I think the patch allows only one broker id -

        + // e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar
        +
        + final int brokerId = job.getInt("kafka.output.broker_id", KAFKA_DEFAULT_BROKER_ID);
        + StringBuilder brokerListBuilder = new StringBuilder();
        + String delim = "";
        + for (String serverPort : uri.getAuthority().split(","))

        { + brokerListBuilder.append(delim).append(String.format("%d:%s", brokerId, serverPort)); + delim = ","; + }

        This requirement seems like an overkill to me. I've filed KAFKA-258 to address this improvement in the Producer. Until that is fixed, the Producer would require the connection url to specify a unique broker id.

        2. This makes sense. I missed looking at KafkaRecordWriter.

        Show
        Neha Narkhede added a comment - Thanks for the updated patch Sam. 1. The broker.list parameter takes in a csv list of "broker_id:broker_host:broker_port". The broker id of each server in this connection URL must be unique. I think the patch allows only one broker id - + // e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar + + final int brokerId = job.getInt("kafka.output.broker_id", KAFKA_DEFAULT_BROKER_ID); + StringBuilder brokerListBuilder = new StringBuilder(); + String delim = ""; + for (String serverPort : uri.getAuthority().split(",")) { + brokerListBuilder.append(delim).append(String.format("%d:%s", brokerId, serverPort)); + delim = ","; + } This requirement seems like an overkill to me. I've filed KAFKA-258 to address this improvement in the Producer. Until that is fixed, the Producer would require the connection url to specify a unique broker id. 2. This makes sense. I missed looking at KafkaRecordWriter.
        Sam Shah made changes -
        Attachment kafka-257.patch [ 12512068 ]
        Sam Shah made changes -
        Attachment KAFKA-257.patch [ 12513201 ]
        Hide
        Sam Shah added a comment -

        Thanks Neha. I've updated the patch so that it just enumerates broker id's (until KAFKA-258 is resolved).

        Show
        Sam Shah added a comment - Thanks Neha. I've updated the patch so that it just enumerates broker id's (until KAFKA-258 is resolved).
        Hide
        Neha Narkhede added a comment -

        Thanks for the patch Sam ! Committed it.

        Show
        Neha Narkhede added a comment - Thanks for the patch Sam ! Committed it.
        Neha Narkhede made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Assignee Sam Shah [ shahsam ]
        Resolution Fixed [ 1 ]
        Transition Time In Source Status Execution Times Last Executer Last Execution Date
        Open Open Patch Available Patch Available
        32s 1 Sam Shah 26/Jan/12 05:56
        Patch Available Patch Available Resolved Resolved
        11d 17h 16m 1 Neha Narkhede 06/Feb/12 23:12

          People

          • Assignee:
            Sam Shah
            Reporter:
            Sam Shah
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development