Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.1.3, 1.2.2
-
None
-
None
Description
使用 kafka.topic.wildcard.match =true的时候,如果topic数目大于1,ZkCoordinator.refresh中deletedManagers会出现逻辑错误
只需要将ZkCoordinator@L91: Map<Integer, PartitionManager> deletedManagers = new HashMap<>();
将key修改为topic+partition
在org.apache.storm.kafka.ZkCoordinatorTest中添加了如下测试
//代码占位符 public static GlobalPartitionInformation buildPartitionInfo(int numPartitions, int brokerPort, String topic) { GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic); for (int i = 0; i < numPartitions; i++) { globalPartitionInformation.addPartition(i, Broker.fromString("broker-" + i + " :" + brokerPort)); } return globalPartitionInformation; } @Test public void testTwoTopicPartitionsChange() throws Exception { int numPartitions = 2; int partitionsPerTask = 1; final Set<Partition> unregisterList = new HashSet<>(); Mockito.doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { Object[] arguments = invocation.getArguments(); Partition partition = new Partition((Broker) arguments[0], (String) arguments[1], (int) arguments[2], false); unregisterList.add(partition); return null; } }).when(dynamicPartitionConnections).unregister(any(Broker.class), any(String.class), anyInt()); List<ZkCoordinator> coordinatorList = buildCoordinators(partitionsPerTask); ArrayList<GlobalPartitionInformation> prePartitionInformations = Lists.newArrayList(buildPartitionInfo(numPartitions, 9092, "TOPIC1"), buildPartitionInfo(numPartitions, 9092, "TOPIC2")); when(reader.getBrokerInfo()).thenReturn(prePartitionInformations); List<List<PartitionManager>> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList); waitForRefresh(); when(reader.getBrokerInfo()).thenReturn(Lists.newArrayList(buildPartitionInfo(numPartitions, 9093, "TOPIC1"), buildPartitionInfo(numPartitions, 9093, "TOPIC2"))); List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList); List<Partition> allPrePartition = KafkaUtils.calculatePartitionsForTask(prePartitionInformations, 1, 0, 0); assertEquals(unregisterList.size(), allPrePartition.size()); for (Partition partition : allPrePartition) { assertTrue(unregisterList.contains(partition)); } }