diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 20db6b7..6f41fc5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -215,9 +215,9 @@ public class KafkaAdminClient extends AdminClient { * @param exc The exception * @param The KafkaFutureImpl result type. */ - private static void completeAllExceptionally(Collection> futures, Throwable exc) { - for (KafkaFutureImpl future : futures) { - future.completeExceptionally(exc); + private static void completeAllExceptionally(Collection> futures, Throwable exc) { + for (KafkaFuture future : futures) { + ((KafkaFutureImpl) future).completeExceptionally(exc); } } @@ -1043,7 +1043,7 @@ public class KafkaAdminClient extends AdminClient { @Override public CreateTopicsResult createTopics(final Collection newTopics, final CreateTopicsOptions options) { - final Map> topicFutures = new HashMap<>(newTopics.size()); + final Map> topicFutures = new HashMap<>(newTopics.size()); final Map topicsMap = new HashMap<>(newTopics.size()); for (NewTopic newTopic : newTopics) { if (topicFutures.get(newTopic.name()) == null) { @@ -1065,7 +1065,7 @@ public class KafkaAdminClient extends AdminClient { CreateTopicsResponse response = (CreateTopicsResponse) abstractResponse; // Handle server responses for particular topics. for (Map.Entry entry : response.errors().entrySet()) { - KafkaFutureImpl future = topicFutures.get(entry.getKey()); + KafkaFutureImpl future = (KafkaFutureImpl) topicFutures.get(entry.getKey()); if (future == null) { log.warn("Server response mentioned unknown topic {}", entry.getKey()); } else { @@ -1078,8 +1078,8 @@ public class KafkaAdminClient extends AdminClient { } } // The server should send back a response for every topic. But do a sanity check anyway. - for (Map.Entry> entry : topicFutures.entrySet()) { - KafkaFutureImpl future = entry.getValue(); + for (Map.Entry> entry : topicFutures.entrySet()) { + KafkaFutureImpl future = (KafkaFutureImpl) entry.getValue(); if (!future.isDone()) { future.completeExceptionally(new ApiException("The server response did not " + "contain a reference to node " + entry.getKey())); @@ -1098,7 +1098,7 @@ public class KafkaAdminClient extends AdminClient { @Override public DeleteTopicsResult deleteTopics(final Collection topicNames, DeleteTopicsOptions options) { - final Map> topicFutures = new HashMap<>(topicNames.size()); + final Map> topicFutures = new HashMap<>(topicNames.size()); for (String topicName : topicNames) { if (topicFutures.get(topicName) == null) { topicFutures.put(topicName, new KafkaFutureImpl()); @@ -1118,7 +1118,7 @@ public class KafkaAdminClient extends AdminClient { DeleteTopicsResponse response = (DeleteTopicsResponse) abstractResponse; // Handle server responses for particular topics. for (Map.Entry entry : response.errors().entrySet()) { - KafkaFutureImpl future = topicFutures.get(entry.getKey()); + KafkaFutureImpl future = (KafkaFutureImpl) topicFutures.get(entry.getKey()); if (future == null) { log.warn("Server response mentioned unknown topic {}", entry.getKey()); } else { @@ -1131,8 +1131,8 @@ public class KafkaAdminClient extends AdminClient { } } // The server should send back a response for every topic. But do a sanity check anyway. - for (Map.Entry> entry : topicFutures.entrySet()) { - KafkaFutureImpl future = entry.getValue(); + for (Map.Entry> entry : topicFutures.entrySet()) { + KafkaFutureImpl future = (KafkaFutureImpl) entry.getValue(); if (!future.isDone()) { future.completeExceptionally(new ApiException("The server response did not " + "contain a reference to node " + entry.getKey())); @@ -1183,7 +1183,7 @@ public class KafkaAdminClient extends AdminClient { @Override public DescribeTopicsResult describeTopics(final Collection topicNames, DescribeTopicsOptions options) { - final Map> topicFutures = new HashMap<>(topicNames.size()); + final Map> topicFutures = new HashMap<>(topicNames.size()); final ArrayList topicNamesList = new ArrayList<>(); for (String topicName : topicNames) { if (!topicFutures.containsKey(topicName)) { @@ -1209,9 +1209,9 @@ public class KafkaAdminClient extends AdminClient { void handleResponse(AbstractResponse abstractResponse) { MetadataResponse response = (MetadataResponse) abstractResponse; // Handle server responses for particular topics. - for (Map.Entry> entry : topicFutures.entrySet()) { + for (Map.Entry> entry : topicFutures.entrySet()) { String topicName = entry.getKey(); - KafkaFutureImpl future = entry.getValue(); + KafkaFutureImpl future = (KafkaFutureImpl) entry.getValue(); Errors topicError = response.errors().get(topicName); if (topicError != null) { future.completeExceptionally(topicError.exception()); @@ -1339,7 +1339,7 @@ public class KafkaAdminClient extends AdminClient { @Override public CreateAclsResult createAcls(Collection acls, CreateAclsOptions options) { final long now = time.milliseconds(); - final Map> futures = new HashMap<>(); + final Map> futures = new HashMap<>(); final List aclCreations = new ArrayList<>(); for (AclBinding acl : acls) { if (futures.get(acl) == null) { @@ -1368,7 +1368,7 @@ public class KafkaAdminClient extends AdminClient { List responses = response.aclCreationResponses(); Iterator iter = responses.iterator(); for (AclCreation aclCreation : aclCreations) { - KafkaFutureImpl future = futures.get(aclCreation.acl()); + KafkaFutureImpl future = (KafkaFutureImpl) futures.get(aclCreation.acl()); if (!iter.hasNext()) { future.completeExceptionally(new UnknownServerException( "The broker reported no creation result for the given ACL.")); @@ -1394,7 +1394,7 @@ public class KafkaAdminClient extends AdminClient { @Override public DeleteAclsResult deleteAcls(Collection filters, DeleteAclsOptions options) { final long now = time.milliseconds(); - final Map> futures = new HashMap<>(); + final Map> futures = new HashMap<>(); final List filterList = new ArrayList<>(); for (AclBindingFilter filter : filters) { if (futures.get(filter) == null) { @@ -1416,7 +1416,7 @@ public class KafkaAdminClient extends AdminClient { List responses = response.responses(); Iterator iter = responses.iterator(); for (AclBindingFilter filter : filterList) { - KafkaFutureImpl future = futures.get(filter); + KafkaFutureImpl future = (KafkaFutureImpl) futures.get(filter); if (!iter.hasNext()) { future.completeExceptionally(new UnknownServerException( "The broker reported no deletion result for the given filter.")); @@ -1445,7 +1445,7 @@ public class KafkaAdminClient extends AdminClient { @Override public DescribeConfigsResult describeConfigs(Collection configResources, final DescribeConfigsOptions options) { - final Map> unifiedRequestFutures = new HashMap<>(); + final Map> unifiedRequestFutures = new HashMap<>(); final Map> brokerFutures = new HashMap<>(configResources.size()); // The BROKER resources which we want to describe. We must make a separate DescribeConfigs @@ -1479,9 +1479,9 @@ public class KafkaAdminClient extends AdminClient { @Override void handleResponse(AbstractResponse abstractResponse) { DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse; - for (Map.Entry> entry : unifiedRequestFutures.entrySet()) { + for (Map.Entry> entry : unifiedRequestFutures.entrySet()) { ConfigResource configResource = entry.getKey(); - KafkaFutureImpl future = entry.getValue(); + KafkaFutureImpl future = (KafkaFutureImpl) entry.getValue(); DescribeConfigsResponse.Config config = response.config(configResourceToResource(configResource)); if (config == null) { future.completeExceptionally(new UnknownServerException( @@ -1571,7 +1571,7 @@ public class KafkaAdminClient extends AdminClient { @Override public AlterConfigsResult alterConfigs(Map configs, final AlterConfigsOptions options) { - final Map> futures = new HashMap<>(configs.size()); + final Map> futures = new HashMap<>(configs.size()); for (ConfigResource configResource : configs.keySet()) { futures.put(configResource, new KafkaFutureImpl()); } @@ -1596,8 +1596,8 @@ public class KafkaAdminClient extends AdminClient { @Override public void handleResponse(AbstractResponse abstractResponse) { AlterConfigsResponse response = (AlterConfigsResponse) abstractResponse; - for (Map.Entry> entry : futures.entrySet()) { - KafkaFutureImpl future = entry.getValue(); + for (Map.Entry> entry : futures.entrySet()) { + KafkaFutureImpl future = (KafkaFutureImpl) entry.getValue(); ApiException exception = response.errors().get(configResourceToResource(entry.getKey())).exception(); if (exception != null) { future.completeExceptionally(exception); @@ -1617,7 +1617,7 @@ public class KafkaAdminClient extends AdminClient { @Override public AlterReplicaDirResult alterReplicaDir(Map replicaAssignment, final AlterReplicaDirOptions options) { - final Map> futures = new HashMap<>(replicaAssignment.size()); + final Map> futures = new HashMap<>(replicaAssignment.size()); for (TopicPartitionReplica replica : replicaAssignment.keySet()) { futures.put(replica, new KafkaFutureImpl()); @@ -1655,7 +1655,7 @@ public class KafkaAdminClient extends AdminClient { TopicPartition tp = responseEntry.getKey(); Errors error = responseEntry.getValue(); TopicPartitionReplica replica = new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId); - KafkaFutureImpl future = futures.get(replica); + KafkaFutureImpl future = (KafkaFutureImpl) futures.get(replica); if (future == null) { handleFailure(new IllegalArgumentException( "The partition " + tp + " in the response from broker " + brokerId + " is not in the request")); @@ -1678,7 +1678,7 @@ public class KafkaAdminClient extends AdminClient { @Override public DescribeLogDirsResult describeLogDirs(Collection brokers, DescribeLogDirsOptions options) { - final Map>> futures = new HashMap<>(brokers.size()); + final Map>> futures = new HashMap<>(brokers.size()); for (Integer brokerId: brokers) { futures.put(brokerId, new KafkaFutureImpl>()); @@ -1698,7 +1698,8 @@ public class KafkaAdminClient extends AdminClient { @Override public void handleResponse(AbstractResponse abstractResponse) { DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse; - KafkaFutureImpl> future = futures.get(brokerId); + KafkaFutureImpl> future = + (KafkaFutureImpl>) futures.get(brokerId); if (response.logDirInfos().size() > 0) { future.complete(response.logDirInfos()); } else { @@ -1718,7 +1719,7 @@ public class KafkaAdminClient extends AdminClient { @Override public DescribeReplicaLogDirResult describeReplicaLogDir(Collection replicas, DescribeReplicaLogDirOptions options) { - final Map> futures = new HashMap<>(replicas.size()); + final Map> futures = new HashMap<>(replicas.size()); for (TopicPartitionReplica replica : replicas) { futures.put(replica, new KafkaFutureImpl()); @@ -1786,7 +1787,8 @@ public class KafkaAdminClient extends AdminClient { for (Map.Entry entry: replicaDirInfoByPartition.entrySet()) { TopicPartition tp = entry.getKey(); - KafkaFutureImpl future = futures.get(new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId)); + KafkaFutureImpl future = (KafkaFutureImpl) + futures.get(new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId)); future.complete(entry.getValue()); } } @@ -1797,6 +1799,6 @@ public class KafkaAdminClient extends AdminClient { }, now); } - return new DescribeReplicaLogDirResult(new HashMap>(futures)); + return new DescribeReplicaLogDirResult(futures); } }