Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-1854

Trident transactional spouts are broken in 1.0.x

VotersWatch issueWatchersLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Duplicate
    • Affects Version/s: 1.0.0, 1.0.1
    • Fix Version/s: None
    • Component/s: trident
    • Labels:
      None

      Description

      In the process of upgrading our Storm code from 0.10.0 to 1.0.0, I've run into an issue with TransactionalTridentKafkaSpout. When running one of our topologies I'm getting the following exception:

      Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.Integer
      	at org.apache.storm.trident.spout.PartitionedTridentSpoutExecutor$Coordinator.initializeTransaction(PartitionedTridentSpoutExecutor.java:55) ~[storm-core-1.0.0.jar:1.0.0]
      	at org.apache.storm.trident.spout.PartitionedTridentSpoutExecutor$Coordinator.initializeTransaction(PartitionedTridentSpoutExecutor.java:43) ~[storm-core-1.0.0.jar:1.0.0]
      	at org.apache.storm.trident.spout.TridentSpoutCoordinator.execute(TridentSpoutCoordinator.java:70) ~[storm-core-1.0.0.jar:1.0.0]
      	at org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-1.0.0.jar:1.0.0]
      

      The issue appears to be caused by a change in PartitionedTridentSpoutExecutor between the two versions, specifically this method:

      1.0.0 - https://github.com/apache/storm/blob/v1.0.0/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L51

      public Integer initializeTransaction(long txid, Integer prevMetadata, Integer currMetadata) {
          if(currMetadata!=null) {
              return currMetadata;
          } else {
              return _coordinator.getPartitionsForBatch();            
          }
      }
      

      0.10.0 - https://github.com/apache/storm/blob/v0.10.0/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L51

      public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
          if(currMetadata!=null) {
              return currMetadata;
          } else {
              return _coordinator.getPartitionsForBatch();            
          }
      }
      

      This was introduced by: https://github.com/apache/storm/commit/9e4c3df17ffbc737210e606d3d8a9cdae8f86634

      TransactionalTridentKafkaSpout uses List<GlobalPartitionInformation> for its metadata. Generally, transactional spouts should have metadata that is more complex than just an Integer. OpaquePartitionedTridentSpoutExecutor uses Object for its metadata and correctly handles the metadata used by OpaqueTridentKafkaSpout (List<GlobalPartitionInformation>).

      It looks like reverting the metadata type for transactional spouts in PartitionedTridentSpoutExecutor should work, but I haven't tried this yet.

        Attachments

        Issue Links

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              morrigan.jones@gmail.com Morrigan Jones

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment