Description
- Update group membership data to include discovery endpoints
- Enable discovery
We need to attach some host and port information to org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo
This can then be used in org.apache.kafka.streams.processor.internals.StreamPartitionAssignor to build a Map<HostState, Set<TopicPartition>> that should be added to
org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo during the assign call.
When StreamPartitionAssignor.onAssignment is called we should hold on to a copy of the Map<HostState, Set<TopicPartition>> for use by the discovery methods
To enable discovery, implement the below methods on KafkaStreams
/** * @return metadata about all tasks */ Map<HostState, Set<TaskMetadata>> getAllTasks(); /** * @param storeName requested store name * @return metadata about all tasks that include * storeName in this KStreams instance */ Map<HostState, Set<TaskMetadata>> getAllTasksWithStore(String storeName); /** * @param key requested key * @param storeName requested store name * @return metadata about all tasks that include * storeName and key in this KStreams instance */ <K> Map<HostState, Set<TaskMetadata>> getAllTasksWithKey(String storeName, K key);