Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
1.1.0, 1.0.4, 1.2.2
Description
In the current implementation of `ZkCoordinator` deleted partition managers are used as state holders for newly created partition managers. This behaviour was introduced in the scope of this ticket. However existing lookup is based on only on partition number.
Map<Integer, PartitionManager> deletedManagers = new HashMap<>(); for (Partition id : deletedPartitions) { deletedManagers.put(id.partition, _managers.remove(id)); } for (PartitionManager manager : deletedManagers.values()) { if (manager != null) manager.close(); } LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition managers: " + newPartitions.toString()); for (Partition id : newPartitions) { PartitionManager man = new PartitionManager( _connections, _topologyInstanceId, _state, _topoConf, _spoutConfig, id, deletedManagers.get(id.partition)); _managers.put(id, man);
Which is definitely incorrect as the same task is able to manage multiple partitions with the same number but for different topics. In this case all new partition managers obtain the same offset value from a random deleted partition manager (as `HashMap` is used). And all fetch requests for the new partition managers fail with `TopicOffsetOutOfRangeException`. Some of them are recovered via this logic if assigned offset is smaller than the real one, but other continue to repetitively fail with offset out of range exception preventing fetching messages from Kafka.
if (offset > _emittedToOffset) { _lostMessageCount.incrBy(offset - _emittedToOffset); _emittedToOffset = offset; LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset); }
I assume that state holder lookup should be based both on topic and partition number.
Attachments
Issue Links
- is related to
-
STORM-2296 Kafka spout - no duplicates on topic leader changes
- Resolved
- links to