Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2225

Kafka New API make simple things simple

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.0.0, 2.0.0
    • 2.0.0, 1.1.0
    • storm-kafka-client
    • None

    Description

      The Kafka spouts in storm-kafka-client use the new API and are very extendable, but doing very simple things take way too many lines of code.

      For example to create a KafkaTridentSpoutOpaque you need the following code (from the example).

          private KafkaTridentSpoutOpaque<String, String> newKafkaTridentSpoutOpaque() {
              return new KafkaTridentSpoutOpaque<>(new KafkaTridentSpoutManager<>(
                              newKafkaSpoutConfig(
                              newKafkaSpoutStreams())));
          }
      
          private KafkaSpoutConfig<String,String> newKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
              return new KafkaSpoutConfig.Builder<>(newKafkaConsumerProps(),
                          kafkaSpoutStreams, newTuplesBuilder(), newRetryService())
                      .setOffsetCommitPeriodMs(10_000)
                      .setFirstPollOffsetStrategy(EARLIEST)
                      .setMaxUncommittedOffsets(250)
                      .build();
          }
      
          protected Map<String,Object> newKafkaConsumerProps() {
              Map<String, Object> props = new HashMap<>();
              props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092");
              props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
              props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
              props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
              props.put("max.partition.fetch.bytes", 200);
              return props;
          }
      
          protected KafkaSpoutTuplesBuilder<String, String> newTuplesBuilder() {
              return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
                      new TopicsTupleBuilder<String, String>(TOPIC_1, TOPIC_2))
                      .build();
          }
      
          protected KafkaSpoutRetryService newRetryService() {
              return new KafkaSpoutRetryExponentialBackoff(new KafkaSpoutRetryExponentialBackoff.TimeInterval(500L, TimeUnit.MICROSECONDS),
                      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
                      Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
          }
      
          protected KafkaSpoutStreams newKafkaSpoutStreams() {
              return new KafkaSpoutStreamsNamedTopics.Builder(new Fields("str"), new String[]{"test-trident","test-trident-1"}).build();
          }
      
          protected static class TopicsTupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
              public TopicsTupleBuilder(String... topics) {
                  super(topics);
              }
              @Override
              public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
                  return new Values(consumerRecord.value());
              }
          }
      

      All of this so I can have a trident spout that reads <String, String> values from "localhost:9092" on the topics "test-trident" and "test-trident-1" and outputting the value as the field "str".

      I shouldn't need 50 lines of code for something I can explain in 3 lines of test. It feels like we need to have some better defaults, and less overhead on a lot of these things.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            revans2 Robert Joseph Evans
            revans2 Robert Joseph Evans
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 24h 40m
                24h 40m

                Slack

                  Issue deployment