Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-1363

TridentKafkaState should handle null values from TridentTupleToKafkaMapper.getMessageFromTuple()

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.10.1, 1.x
    • Fix Version/s: 2.0.0, 1.1.0
    • Component/s: storm-kafka-client
    • Labels:
      None

      Description

      If you look at the updateState API of storm.kafka.trident.TridentKafkaState. When producer is sending data its not handling if the null value is sent by mapper.getMessageFromTuple(tuple). Results into Kafka topic gets value as "null" string. There might be case in particular kind of exception user do not want to replay tuple and just report it and with that he needs to return null.

      Also make the members as protected as I need to copy-paste the class to provide my implementation.

      My updateState API looks like this

      public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
            String topic = null;
      		for (TridentTuple tuple : tuples) {
      			if(tuple==null) {
      				continue;
      			}
      
      			Object keyFromTuple = null;
      			try {
      				keyFromTuple = mapper.getKeyFromTuple(tuple);
      				topic = topicSelector.getTopic(tuple);
      				Object messageFromTuple = mapper.getMessageFromTuple(tuple);
      				if (topic != null && messageFromTuple != null) {
      					producer.send(new KeyedMessage(topic, keyFromTuple, messageFromTuple));
      				} else {
      					LOG.warn("skipping key = " + keyFromTuple + ", topic selector returned null.");
      				}
      			} catch (Exception ex) {
      				String errorMsg = "Could not send message with key = " + keyFromTuple + " to topic = " + topic;
      				LOG.warn(errorMsg, ex);
      				throw new FailedException(errorMsg, ex);
      			}
      		}
      	}
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Sachin Sachin Pasalkar
                Reporter:
                Sachin Sachin Pasalkar
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 3h 50m
                  3h 50m