Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-3090

The same offset value is used by the same partition number of different topics.

    XMLWordPrintableJSON

Details

    Description

      In the current implementation of `ZkCoordinator` deleted partition managers are used as state holders for newly created partition managers. This behaviour was introduced in the scope of this ticket. However existing lookup is based on only on partition number.

      Map<Integer, PartitionManager> deletedManagers = new HashMap<>();
      for (Partition id : deletedPartitions) {
       deletedManagers.put(id.partition, _managers.remove(id));
      }
      for (PartitionManager manager : deletedManagers.values()) {
       if (manager != null) manager.close();
      }
      LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition managers: " + newPartitions.toString());
      
      for (Partition id : newPartitions) {
       PartitionManager man = new PartitionManager(
       _connections,
       _topologyInstanceId,
       _state,
       _topoConf,
       _spoutConfig,
       id,
       deletedManagers.get(id.partition));
       _managers.put(id, man);
      

      Which is definitely incorrect as the same task is able to manage multiple partitions with the same number but for different topics. In this case all new partition managers obtain the same offset value from a random deleted partition manager (as `HashMap` is used). And all fetch requests for the new partition managers fail with `TopicOffsetOutOfRangeException`. Some of them are recovered via this logic if assigned offset is smaller than the real one, but other continue to repetitively fail with offset out of range exception preventing fetching messages from Kafka.

      if (offset > _emittedToOffset) {
       _lostMessageCount.incrBy(offset - _emittedToOffset);
       _emittedToOffset = offset;
       LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
      }
      

      I assume that state holder lookup should be based both on topic and partition number.

      Attachments

        Issue Links

          Activity

            People

              choojoyq Nikita Gorbachevski
              choojoyq Nikita Gorbachevski
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 4.5h
                  4.5h