Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
Description
Currently KafkaAdminClient.deleteConsumerGroups, listConsumerGroupOffsets method implementation ignoring FindCoordinatorResponse errors. This causes admin client request timeouts incase of authorization errors. We should handle these errors.
Attachments
Issue Links
- links to
Activity
hachikuji closed pull request #5278: KAFKA-7091: AdminClient should handle FindCoordinatorResponse errors
URL: https://github.com/apache/kafka/pull/5278
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 62b6b6ee752..7e245d1d1d7 100644
— a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2395,16 +2395,9 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<Stri
@Override
void handleResponse(AbstractResponse abstractResponse) {
final FindCoordinatorResponse fcResponse = (FindCoordinatorResponse) abstractResponse;
- Errors error = fcResponse.error();
- if (error == Errors.COORDINATOR_NOT_AVAILABLE)
{
- // Retry COORDINATOR_NOT_AVAILABLE, in case the error is temporary.
- throw error.exception();
- }
else if (error != Errors.NONE)
{ - // All other errors are immediate failures. - KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId); - future.completeExceptionally(error.exception()); + + if (handleFindCoordinatorError(fcResponse, futures.get(groupId))) return; - }
final long nowDescribeConsumerGroups = time.milliseconds();
final int nodeId = fcResponse.node().id();
@@ -2476,6 +2469,17 @@ void handleFailure(Throwable throwable)
+ private boolean handleFindCoordinatorError(FindCoordinatorResponse response, KafkaFutureImpl<?> future) {
+ Errors error = response.error();
+ if (error.exception() instanceof RetriableException)
else if (response.hasError())
{ + future.completeExceptionally(error.exception()); + return true; + }+ return false;
+ }
+
private final static class ListConsumerGroupsResults {
private final List<Throwable> errors;
private final HashMap<String, ConsumerGroupListing> listings;
@@ -2610,6 +2614,9 @@ public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String grou
void handleResponse(AbstractResponse abstractResponse) {
final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
+ if (handleFindCoordinatorError(response, groupOffsetListingFuture))
+ return;
+
final long nowListConsumerGroupOffsets = time.milliseconds();
final int nodeId = response.node().id();
@@ -2696,6 +2703,9 @@ public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupI
void handleResponse(AbstractResponse abstractResponse) {
final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
+ if (handleFindCoordinatorError(response, futures.get(groupId)))
+ return;
+
final long nowDeleteConsumerGroups = time.milliseconds();
final int nodeId = response.node().id();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index 39726dac6b9..bc7f654c0bb 100644
— a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -68,9 +68,11 @@
/**
- Possible error codes:
*
+ * COORDINATOR_LOAD_IN_PROGRESS (14) - COORDINATOR_NOT_AVAILABLE (15)
- * NOT_COORDINATOR (16)
- GROUP_AUTHORIZATION_FAILED (30)
+ * INVALID_REQUEST (42)
+ * TRANSACTIONAL_ID_AUTHORIZATION_FAILED (53)
*/
@@ -107,6 +109,10 @@ public int throttleTimeMs()
{ return throttleTimeMs; }+ public boolean hasError()
{ + return this.error != Errors.NONE; + }+
public Errors error()
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 3566f834220..836307902f4 100644
— a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -37,6 +37,7 @@
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
@@ -1071,6 +1072,10 @@ public void testDeleteConsumerGroups() throws Exception {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().setNode(env.cluster().controller());
+ //Retriable FindCoordinatorResponse errors should be retried
+ env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
+ env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
+
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
final Map<String, Errors> response = new HashMap<>();
@@ -1081,6 +1086,13 @@ public void testDeleteConsumerGroups() throws Exception
}
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index fe98fda8785..9055e68deed 100644
— a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -29,12 +29,13 @@ import kafka.log.LogConfig
import kafka.server.
import org.apache.kafka.clients.admin._
import kafka.utils.
+import kafka.utils.TestUtils._
import kafka.utils.Implicits._
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.
+import org.apache.kafka.common.
{ConsumerGroupState, TopicPartition, TopicPartitionReplica} import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
@@ -125,18 +126,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
}, "timed out waiting for topics")
}
- def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable]): Unit = {
- try
{
- future.get()
- fail("Expected CompletableFuture.get to return an exception")
- }
catch
{ - case e: ExecutionException => - val cause = e.getCause() - assertTrue("Expected an exception of type " + clazz.getName + "; got type " + - cause.getClass().getName, clazz.isInstance(cause)) - } - }
-
@Test
def testClose(): Unit = {
val client = AdminClient.create(createConfig())
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a21affcdc99..72b3b24b8c0 100644-
- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -19,7 +19,6 @@ import java.util.regex.Pattern
import java.util. {ArrayList, Collections, Properties}import java.time.Duration
- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
-
-import kafka.admin.AdminClient
import kafka.admin.ConsumerGroupCommand.
import kafka.common.TopicAndPartition
import kafka.log.LogConfig
@@ -27,7 +26,7 @@ import kafka.network.SocketServer
import kafka.security.auth._
import kafka.server.
import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.NewPartitions
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewPartitions}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.clients.producer._
@@ -1004,17 +1003,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest { this.consumers.head.partitionsFor(topic) }
- @Test(expected = classOf[GroupAuthorizationException])
+ @Test
def testDescribeGroupApiWithNoGroupAcl() { addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource) - createAdminClient().describeConsumerGroup(group) + val result = createAdminClient().describeConsumerGroups(Seq(group).asJava) + TestUtils.assertFutureExceptionTypeEquals(result.describedGroups().get(group), classOf[GroupAuthorizationException]) }
@Test
def testDescribeGroupApiWithGroupDescribe() { addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource) addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource) - createAdminClient().describeConsumerGroup(group) + createAdminClient().describeConsumerGroups(Seq(group).asJava).describedGroups().get(group).get() }
@Test
@@ -1036,8 +1036,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), groupResource) this.consumers.head.assign(List(tp).asJava) this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava) - val result = createAdminClient().deleteConsumerGroups(List(group)) - assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NONE)) + createAdminClient().deleteConsumerGroups(Seq(group).asJava).deletedGroups().get(group).get() }
@Test
@@ -1046,14 +1045,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) this.consumers.head.assign(List(tp).asJava) this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava) - val result = createAdminClient().deleteConsumerGroups(List(group)) - assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED)) + val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava) + TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group), classOf[GroupAuthorizationException]) }
@Test
def testDeleteGroupApiWithNoDeleteGroupAcl2() { - val result = createAdminClient().deleteConsumerGroups(List(group)) - assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED)) + val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava) + TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group), classOf[GroupAuthorizationException]) }
@Test
@@ -1462,7 +1461,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
private def createAdminClient(): AdminClient = { - val adminClient = AdminClient.createSimplePlaintext(brokerList) + val props = new Properties() + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + val adminClient = AdminClient.create(props) adminClients += adminClient adminClient }
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index f200cc2d55c..07cbf0cf414 100644
— a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -14,28 +14,35 @@
package kafka.api
import java.util.concurrent._
-import java.util.{Collection, Collections}
+import java.util.{Collection, Collections, Properties}
-import kafka.admin.AdminClient
-import kafka.server.KafkaConfig
+import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.
{CoreUtils, Logging, ShutdownableThread, TestUtils} import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.
+import org.apache.kafka.clients.producer.
{KafkaProducer, ProducerConfig, ProducerRecord}import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse}
+import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.Assert._
import org.junit.{After, Before, Ignore, Test}
import scala.collection.JavaConverters._
+import scala.collection.mutable.Buffer
/**
* Integration tests for the consumer that cover basic usage as well as server failures
*/
-class ConsumerBounceTest extends IntegrationTestHarness with Logging {
+class ConsumerBounceTest extends BaseRequestTest with Logging {
+
+ override def numBrokers: Int = 3
val producerCount = 1
val consumerCount = 2
- val serverCount = 3
+
+ val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+ val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
val topic = "topic"
val part = 0
@@ -45,13 +52,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
val gracefulCloseTimeMs = 1000
val executor = Executors.newScheduledThreadPool(2)
- // configure the servers and clients
- this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
- this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
- this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout
- this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
- this.serverConfig.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
- this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false")
+ val producerConfig = new Properties
+ val consumerConfig = new Properties
this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
@@ -59,8 +61,19 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000")
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ def serverConfig(): Properties = { + val properties = new Properties + properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset + properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") + properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout + properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") + properties.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true") + properties.put(KafkaConfig.AutoCreateTopicsEnableProp, "false") + properties + }
+
override def generateConfigs = { - FixedPortTestUtils.createBrokerConfigs(serverCount, zkConnect, enableControlledShutdown = false) + FixedPortTestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false) .map(KafkaConfig.fromProps(_, serverConfig)) }
@@ -68,8 +81,26 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
override def setUp() { super.setUp() + for (_ <- 0 until producerCount) + producers += createProducer + + for (_ <- 0 until consumerCount) + consumers += createConsumer + // create the test topic with all the brokers as replicas - createTopic(topic, 1, serverCount) + createTopic(topic, 1, numBrokers) + }
+
+ def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = { + TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), + securityProtocol = SecurityProtocol.PLAINTEXT, + props = Some(producerConfig)) + }
+
+ def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = { + TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers), + securityProtocol = SecurityProtocol.PLAINTEXT, + props = Some(consumerConfig)) }
@After
@@ -78,6 +109,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { executor.shutdownNow() // Wait for any active tasks to terminate to ensure consumer is not closed while being used from another thread assertTrue("Executor did not terminate", executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) + producers.foreach(_.close()) + consumers.foreach(_.close()) } finally { super.tearDown() }
@@ -173,7 +206,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
val consumer = this.consumers.head
consumer.subscribe(Collections.singleton(newtopic))
executor.schedule(new Runnable { - def run() = createTopic(newtopic, numPartitions = serverCount, replicationFactor = serverCount) + def run() = createTopic(newtopic, numPartitions = numBrokers, replicationFactor = numBrokers) }, 2, TimeUnit.SECONDS)
consumer.poll(0)
@@ -243,9 +276,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
val consumer1 = createConsumerAndReceive(dynamicGroup, false, numRecords)
val consumer2 = createConsumerAndReceive(manualGroup, true, numRecords)
- val adminClient = AdminClient.createSimplePlaintext(this.brokerList)
- killBroker(adminClient.findCoordinator(dynamicGroup).id)
- killBroker(adminClient.findCoordinator(manualGroup).id)
+ killBroker(findCoordinator(dynamicGroup))
+ killBroker(findCoordinator(manualGroup))
val future1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs))
val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(gracefulCloseTimeMs))
@@ -255,9 +287,16 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { restartDeadBrokers() checkClosedState(dynamicGroup, 0) checkClosedState(manualGroup, numRecords) - adminClient.close() }
+ private def findCoordinator(group: String) : Int = { + val request = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, group).build() + val resp = connectAndSend(request, ApiKeys.FIND_COORDINATOR) + val response = FindCoordinatorResponse.parse(resp, ApiKeys.FIND_COORDINATOR.latestVersion()) + response.node().id() + }
+
+
/**
* Consumer is closed while all brokers are unavailable. Cannot rebalance or commit offsets since
* there is no coordinator, but close should timeout and return. If close is invoked with a very
@@ -288,7 +327,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
@Test
def testCloseDuringRebalance() {
val topic = "closetest"
- createTopic(topic, 10, serverCount)
+ createTopic(topic, 10, numBrokers)
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
@@ -355,7 +394,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
private def createConsumer(groupId: String) : KafkaConsumer[Array[Byte], Array[Byte]] = { this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) - val consumer = super.createConsumer + val consumer = createConsumer consumers += consumer consumer }
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index 9da69370d80..3b63613419d 100644
— a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -18,6 +18,8 @@ import java.util
import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer, ClusterAction, Create, Delete, Deny, Describe, Group, Operation, PermissionType, SimpleAclAuthorizer, Topic, Acl => AuthAcl, Resource => AuthResource}
import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
+import kafka.utils.TestUtils._
+
import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index db451961337..7b68cc0a016 100755
— a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -25,7 +25,7 @@ import java.nio.file.{Files, StandardOpenOption}
import java.security.cert.X509Certificate
import java.time.Duration
import java.util.{Collections, Properties}
-import java.util.concurrent.{Callable, Executors, TimeUnit}
+import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
import javax.net.ssl.X509TrustManager
import kafka.api._
@@ -41,7 +41,7 @@ import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, AlterConfigsResult, Config, ConfigEntry}
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata, RangeAssignor}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.internals.Topic
@@ -1374,4 +1374,15 @@ object TestUtils extends Logging
+ def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable]): Unit = {
+ try
catch
{ + case e: ExecutionException => + val cause = e.getCause() + assertTrue("Expected an exception of type " + clazz.getName + "; got type " + + cause.getClass().getName, clazz.isInstance(cause)) + }+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 249e2c3cffd..64b23cb799c 100644
— a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -17,12 +17,12 @@
package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
-import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
@@ -61,9 +61,9 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import kafka.admin.AdminClient;
import kafka.tools.StreamsResetter;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -77,20 +77,15 @@
private static MockTime mockTime;
private static KafkaStreams streams;
private static AdminClient adminClient = null;
- private static KafkaAdminClient kafkaAdminClient = null;
abstract Map<String, Object> getClientSslConfig();
@AfterClass
public static void afterClassCleanup() {
if (adminClient != null)
- if (kafkaAdminClient != null)
{
- kafkaAdminClient.close(10, TimeUnit.SECONDS);
- kafkaAdminClient = null;
- }
}
private String appID = "abstract-reset-integration-test";
@@ -103,9 +98,6 @@ private void prepareEnvironment() {
if (adminClient == null)
- if (kafkaAdminClient == null) { - kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig); - }
boolean timeSet = false;
while (!timeSet) {
@@ -184,8 +176,9 @@ private void prepareConfigs() {
@Override
public boolean conditionMet() {
try
catch (final TimeoutException e)
{ + ConsumerGroupDescription groupDescription = adminClient.describeConsumerGroups(Collections.singletonList(appID)).describedGroups().get(appID).get(); + return groupDescription.members().isEmpty(); + }catch (final ExecutionException | InterruptedException e)
{ return false; }}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
omkreddy opened a new pull request #5278:
KAFKA-7091: AdminClient should handle FindCoordinatorResponse errorsURL: https://github.com/apache/kafka/pull/5278
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org