Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2225

Kafka New API make simple things simple

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.0.0, 2.0.0
    • 2.0.0, 1.1.0
    • storm-kafka-client
    • None

    Description

      The Kafka spouts in storm-kafka-client use the new API and are very extendable, but doing very simple things take way too many lines of code.

      For example to create a KafkaTridentSpoutOpaque you need the following code (from the example).

          private KafkaTridentSpoutOpaque<String, String> newKafkaTridentSpoutOpaque() {
              return new KafkaTridentSpoutOpaque<>(new KafkaTridentSpoutManager<>(
                              newKafkaSpoutConfig(
                              newKafkaSpoutStreams())));
          }
      
          private KafkaSpoutConfig<String,String> newKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
              return new KafkaSpoutConfig.Builder<>(newKafkaConsumerProps(),
                          kafkaSpoutStreams, newTuplesBuilder(), newRetryService())
                      .setOffsetCommitPeriodMs(10_000)
                      .setFirstPollOffsetStrategy(EARLIEST)
                      .setMaxUncommittedOffsets(250)
                      .build();
          }
      
          protected Map<String,Object> newKafkaConsumerProps() {
              Map<String, Object> props = new HashMap<>();
              props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092");
              props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
              props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
              props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
              props.put("max.partition.fetch.bytes", 200);
              return props;
          }
      
          protected KafkaSpoutTuplesBuilder<String, String> newTuplesBuilder() {
              return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
                      new TopicsTupleBuilder<String, String>(TOPIC_1, TOPIC_2))
                      .build();
          }
      
          protected KafkaSpoutRetryService newRetryService() {
              return new KafkaSpoutRetryExponentialBackoff(new KafkaSpoutRetryExponentialBackoff.TimeInterval(500L, TimeUnit.MICROSECONDS),
                      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
                      Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
          }
      
          protected KafkaSpoutStreams newKafkaSpoutStreams() {
              return new KafkaSpoutStreamsNamedTopics.Builder(new Fields("str"), new String[]{"test-trident","test-trident-1"}).build();
          }
      
          protected static class TopicsTupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
              public TopicsTupleBuilder(String... topics) {
                  super(topics);
              }
              @Override
              public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
                  return new Values(consumerRecord.value());
              }
          }
      

      All of this so I can have a trident spout that reads <String, String> values from "localhost:9092" on the topics "test-trident" and "test-trident-1" and outputting the value as the field "str".

      I shouldn't need 50 lines of code for something I can explain in 3 lines of test. It feels like we need to have some better defaults, and less overhead on a lot of these things.

      Attachments

        Issue Links

          Activity

            People

              revans2 Robert Joseph Evans
              revans2 Robert Joseph Evans
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 24h 40m
                  24h 40m