Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7091

AdminClient should handle FindCoordinatorResponse errors

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 2.0.0
    • None
    • None

    Description

      Currently KafkaAdminClient.deleteConsumerGroups, listConsumerGroupOffsets method implementation ignoring FindCoordinatorResponse errors. This causes admin client request timeouts incase of authorization errors.  We should handle these errors.

      Attachments

        Issue Links

          Activity

            githubbot ASF GitHub Bot added a comment -

            omkreddy opened a new pull request #5278: KAFKA-7091: AdminClient should handle FindCoordinatorResponse errors
            URL: https://github.com/apache/kafka/pull/5278

            • Remove scala AdminClient usage from core and streams tests
                1. Committer Checklist (excluded from commit message)
            • [ ] Verify design and implementation
            • [ ] Verify test coverage and CI build status
            • [ ] Verify documentation (including upgrade notes)

            ----------------------------------------------------------------
            This is an automated message from the Apache Git Service.
            To respond to the message, please log on GitHub and use the
            URL above to go to the specific comment.

            For queries about this service, please contact Infrastructure at:
            users@infra.apache.org

            githubbot ASF GitHub Bot added a comment - omkreddy opened a new pull request #5278: KAFKA-7091 : AdminClient should handle FindCoordinatorResponse errors URL: https://github.com/apache/kafka/pull/5278 Remove scala AdminClient usage from core and streams tests Committer Checklist (excluded from commit message) [ ] Verify design and implementation [ ] Verify test coverage and CI build status [ ] Verify documentation (including upgrade notes) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
            githubbot ASF GitHub Bot added a comment -

            hachikuji closed pull request #5278: KAFKA-7091: AdminClient should handle FindCoordinatorResponse errors
            URL: https://github.com/apache/kafka/pull/5278

            This is a PR merged from a forked repository.
            As GitHub hides the original diff on merge, it is displayed below for
            the sake of provenance:

            As this is a foreign pull request (from a fork), the diff is supplied
            below (as it won't show otherwise due to GitHub magic):

            diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
            index 62b6b6ee752..7e245d1d1d7 100644
            — a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
            +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
            @@ -2395,16 +2395,9 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<Stri
            @Override
            void handleResponse(AbstractResponse abstractResponse) {
            final FindCoordinatorResponse fcResponse = (FindCoordinatorResponse) abstractResponse;

            • Errors error = fcResponse.error();
            • if (error == Errors.COORDINATOR_NOT_AVAILABLE) { - // Retry COORDINATOR_NOT_AVAILABLE, in case the error is temporary. - throw error.exception(); - }

              else if (error != Errors.NONE)

              { - // All other errors are immediate failures. - KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId); - future.completeExceptionally(error.exception()); + + if (handleFindCoordinatorError(fcResponse, futures.get(groupId))) return; - }

            final long nowDescribeConsumerGroups = time.milliseconds();
            final int nodeId = fcResponse.node().id();
            @@ -2476,6 +2469,17 @@ void handleFailure(Throwable throwable)

            { return new DescribeConsumerGroupsResult(new HashMap<String, KafkaFuture<ConsumerGroupDescription>>(futures)); }

            + private boolean handleFindCoordinatorError(FindCoordinatorResponse response, KafkaFutureImpl<?> future) {
            + Errors error = response.error();
            + if (error.exception() instanceof RetriableException)

            { + throw error.exception(); + }

            else if (response.hasError())

            { + future.completeExceptionally(error.exception()); + return true; + }

            + return false;
            + }
            +
            private final static class ListConsumerGroupsResults {
            private final List<Throwable> errors;
            private final HashMap<String, ConsumerGroupListing> listings;
            @@ -2610,6 +2614,9 @@ public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String grou
            void handleResponse(AbstractResponse abstractResponse) {
            final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;

            + if (handleFindCoordinatorError(response, groupOffsetListingFuture))
            + return;
            +
            final long nowListConsumerGroupOffsets = time.milliseconds();

            final int nodeId = response.node().id();
            @@ -2696,6 +2703,9 @@ public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupI
            void handleResponse(AbstractResponse abstractResponse) {
            final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;

            + if (handleFindCoordinatorError(response, futures.get(groupId)))
            + return;
            +
            final long nowDeleteConsumerGroups = time.milliseconds();

            final int nodeId = response.node().id();
            diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
            index 39726dac6b9..bc7f654c0bb 100644
            — a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
            +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
            @@ -68,9 +68,11 @@
            /**

            • Possible error codes:
              *
              + * COORDINATOR_LOAD_IN_PROGRESS (14)
            • COORDINATOR_NOT_AVAILABLE (15)
            • * NOT_COORDINATOR (16)
            • GROUP_AUTHORIZATION_FAILED (30)
              + * INVALID_REQUEST (42)
              + * TRANSACTIONAL_ID_AUTHORIZATION_FAILED (53)
              */

            @@ -107,6 +109,10 @@ public int throttleTimeMs()

            { return throttleTimeMs; }

            + public boolean hasError()

            { + return this.error != Errors.NONE; + }

            +
            public Errors error()

            { return error; }

            diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
            index 3566f834220..836307902f4 100644
            — a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
            +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
            @@ -37,6 +37,7 @@
            import org.apache.kafka.common.config.ConfigResource;
            import org.apache.kafka.common.errors.AuthenticationException;
            import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
            +import org.apache.kafka.common.errors.GroupAuthorizationException;
            import org.apache.kafka.common.errors.InvalidTopicException;
            import org.apache.kafka.common.errors.LeaderNotAvailableException;
            import org.apache.kafka.common.errors.NotLeaderForPartitionException;
            @@ -1071,6 +1072,10 @@ public void testDeleteConsumerGroups() throws Exception {
            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
            env.kafkaClient().setNode(env.cluster().controller());

            + //Retriable FindCoordinatorResponse errors should be retried
            + env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
            + env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
            +
            env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

            final Map<String, Errors> response = new HashMap<>();
            @@ -1081,6 +1086,13 @@ public void testDeleteConsumerGroups() throws Exception

            { final KafkaFuture<Void> results = result.deletedGroups().get("group-0"); assertNull(results.get()); + + //should throw error for non-retriable errors + env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode())); + + final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds); + assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class); + }

            }

            diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
            index fe98fda8785..9055e68deed 100644
            — a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
            +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
            @@ -29,12 +29,13 @@ import kafka.log.LogConfig
            import kafka.server.

            {Defaults, KafkaConfig, KafkaServer}

            import org.apache.kafka.clients.admin._
            import kafka.utils.

            {Logging, TestUtils}

            +import kafka.utils.TestUtils._
            import kafka.utils.Implicits._
            import org.apache.kafka.clients.admin.NewTopic
            import org.apache.kafka.clients.consumer.

            {ConsumerConfig, KafkaConsumer}

            import org.apache.kafka.clients.producer.KafkaProducer
            import org.apache.kafka.clients.producer.ProducerRecord
            -import org.apache.kafka.common.

            {ConsumerGroupState, KafkaFuture, TopicPartition, TopicPartitionReplica}

            +import org.apache.kafka.common.

            {ConsumerGroupState, TopicPartition, TopicPartitionReplica}

            import org.apache.kafka.common.acl._
            import org.apache.kafka.common.config.ConfigResource
            import org.apache.kafka.common.errors._
            @@ -125,18 +126,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
            }, "timed out waiting for topics")
            }

            • def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable]): Unit = {
            • try { - future.get() - fail("Expected CompletableFuture.get to return an exception") - }

              catch

              { - case e: ExecutionException => - val cause = e.getCause() - assertTrue("Expected an exception of type " + clazz.getName + "; got type " + - cause.getClass().getName, clazz.isInstance(cause)) - }
            • }
              -
              @Test
              def testClose(): Unit = {
              val client = AdminClient.create(createConfig())
              diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
              index a21affcdc99..72b3b24b8c0 100644
                • a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
                  +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
                  @@ -19,7 +19,6 @@ import java.util.regex.Pattern
                  import java.util. {ArrayList, Collections, Properties}

                  import java.time.Duration

            -import kafka.admin.AdminClient
            import kafka.admin.ConsumerGroupCommand.

            {ConsumerGroupCommandOptions, ConsumerGroupService}

            import kafka.common.TopicAndPartition
            import kafka.log.LogConfig
            @@ -27,7 +26,7 @@ import kafka.network.SocketServer
            import kafka.security.auth._
            import kafka.server.

            {BaseRequestTest, KafkaConfig}
            import kafka.utils.TestUtils
            -import org.apache.kafka.clients.admin.NewPartitions
            +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewPartitions}
            import org.apache.kafka.clients.consumer._
            import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
            import org.apache.kafka.clients.producer._
            @@ -1004,17 +1003,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest { this.consumers.head.partitionsFor(topic) }

            - @Test(expected = classOf[GroupAuthorizationException])
            + @Test
            def testDescribeGroupApiWithNoGroupAcl() { addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource) - createAdminClient().describeConsumerGroup(group) + val result = createAdminClient().describeConsumerGroups(Seq(group).asJava) + TestUtils.assertFutureExceptionTypeEquals(result.describedGroups().get(group), classOf[GroupAuthorizationException]) }

            @Test
            def testDescribeGroupApiWithGroupDescribe() { addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource) addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource) - createAdminClient().describeConsumerGroup(group) + createAdminClient().describeConsumerGroups(Seq(group).asJava).describedGroups().get(group).get() }

            @Test
            @@ -1036,8 +1036,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), groupResource) this.consumers.head.assign(List(tp).asJava) this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava) - val result = createAdminClient().deleteConsumerGroups(List(group)) - assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NONE)) + createAdminClient().deleteConsumerGroups(Seq(group).asJava).deletedGroups().get(group).get() }

            @Test
            @@ -1046,14 +1045,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) this.consumers.head.assign(List(tp).asJava) this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava) - val result = createAdminClient().deleteConsumerGroups(List(group)) - assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED)) + val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava) + TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group), classOf[GroupAuthorizationException]) }

            @Test
            def testDeleteGroupApiWithNoDeleteGroupAcl2() { - val result = createAdminClient().deleteConsumerGroups(List(group)) - assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED)) + val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava) + TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group), classOf[GroupAuthorizationException]) }

            @Test
            @@ -1462,7 +1461,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
            }

            private def createAdminClient(): AdminClient = { - val adminClient = AdminClient.createSimplePlaintext(brokerList) + val props = new Properties() + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + val adminClient = AdminClient.create(props) adminClients += adminClient adminClient }
            diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
            index f200cc2d55c..07cbf0cf414 100644
            — a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
            +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
            @@ -14,28 +14,35 @@
            package kafka.api

            import java.util.concurrent._
            -import java.util.{Collection, Collections}
            +import java.util.{Collection, Collections, Properties}

            -import kafka.admin.AdminClient
            -import kafka.server.KafkaConfig
            +import kafka.server.{BaseRequestTest, KafkaConfig}

            import kafka.utils.

            {CoreUtils, Logging, ShutdownableThread, TestUtils}

            import org.apache.kafka.clients.consumer._
            -import org.apache.kafka.clients.producer.

            {ProducerConfig, ProducerRecord}

            +import org.apache.kafka.clients.producer.

            {KafkaProducer, ProducerConfig, ProducerRecord}
            import org.apache.kafka.common.TopicPartition
            +import org.apache.kafka.common.protocol.ApiKeys
            +import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse}
            +import org.apache.kafka.common.security.auth.SecurityProtocol
            import org.junit.Assert._
            import org.junit.{After, Before, Ignore, Test}

            import scala.collection.JavaConverters._
            +import scala.collection.mutable.Buffer


            /**
            * Integration tests for the consumer that cover basic usage as well as server failures
            */
            -class ConsumerBounceTest extends IntegrationTestHarness with Logging {
            +class ConsumerBounceTest extends BaseRequestTest with Logging {
            +
            + override def numBrokers: Int = 3

            val producerCount = 1
            val consumerCount = 2
            - val serverCount = 3
            +
            + val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
            + val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()

            val topic = "topic"
            val part = 0
            @@ -45,13 +52,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
            val gracefulCloseTimeMs = 1000
            val executor = Executors.newScheduledThreadPool(2)

            - // configure the servers and clients
            - this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
            - this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
            - this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout
            - this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
            - this.serverConfig.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
            - this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false")
            + val producerConfig = new Properties
            + val consumerConfig = new Properties
            this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
            this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
            this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
            @@ -59,8 +61,19 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
            this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000")
            this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

            + def serverConfig(): Properties = { + val properties = new Properties + properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset + properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") + properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout + properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") + properties.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true") + properties.put(KafkaConfig.AutoCreateTopicsEnableProp, "false") + properties + }
            +
            override def generateConfigs = { - FixedPortTestUtils.createBrokerConfigs(serverCount, zkConnect, enableControlledShutdown = false) + FixedPortTestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false) .map(KafkaConfig.fromProps(_, serverConfig)) }

            @@ -68,8 +81,26 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
            override def setUp() { super.setUp() + for (_ <- 0 until producerCount) + producers += createProducer + + for (_ <- 0 until consumerCount) + consumers += createConsumer + // create the test topic with all the brokers as replicas - createTopic(topic, 1, serverCount) + createTopic(topic, 1, numBrokers) + }
            +
            + def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = { + TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), + securityProtocol = SecurityProtocol.PLAINTEXT, + props = Some(producerConfig)) + }
            +
            + def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = { + TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers), + securityProtocol = SecurityProtocol.PLAINTEXT, + props = Some(consumerConfig)) }

            @After
            @@ -78,6 +109,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { executor.shutdownNow() // Wait for any active tasks to terminate to ensure consumer is not closed while being used from another thread assertTrue("Executor did not terminate", executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) + producers.foreach(_.close()) + consumers.foreach(_.close()) } finally { super.tearDown() }
            @@ -173,7 +206,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
            val consumer = this.consumers.head
            consumer.subscribe(Collections.singleton(newtopic))
            executor.schedule(new Runnable { - def run() = createTopic(newtopic, numPartitions = serverCount, replicationFactor = serverCount) + def run() = createTopic(newtopic, numPartitions = numBrokers, replicationFactor = numBrokers) }, 2, TimeUnit.SECONDS)
            consumer.poll(0)

            @@ -243,9 +276,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
            val consumer1 = createConsumerAndReceive(dynamicGroup, false, numRecords)
            val consumer2 = createConsumerAndReceive(manualGroup, true, numRecords)

            - val adminClient = AdminClient.createSimplePlaintext(this.brokerList)
            - killBroker(adminClient.findCoordinator(dynamicGroup).id)
            - killBroker(adminClient.findCoordinator(manualGroup).id)
            + killBroker(findCoordinator(dynamicGroup))
            + killBroker(findCoordinator(manualGroup))

            val future1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs))
            val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(gracefulCloseTimeMs))
            @@ -255,9 +287,16 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { restartDeadBrokers() checkClosedState(dynamicGroup, 0) checkClosedState(manualGroup, numRecords) - adminClient.close() }

            + private def findCoordinator(group: String) : Int = { + val request = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, group).build() + val resp = connectAndSend(request, ApiKeys.FIND_COORDINATOR) + val response = FindCoordinatorResponse.parse(resp, ApiKeys.FIND_COORDINATOR.latestVersion()) + response.node().id() + }
            +
            +
            /**
            * Consumer is closed while all brokers are unavailable. Cannot rebalance or commit offsets since
            * there is no coordinator, but close should timeout and return. If close is invoked with a very
            @@ -288,7 +327,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
            @Test
            def testCloseDuringRebalance() {
            val topic = "closetest"
            - createTopic(topic, 10, serverCount)
            + createTopic(topic, 10, numBrokers)
            this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
            this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
            this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
            @@ -355,7 +394,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {

            private def createConsumer(groupId: String) : KafkaConsumer[Array[Byte], Array[Byte]] = { this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) - val consumer = super.createConsumer + val consumer = createConsumer consumers += consumer consumer }
            diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
            index 9da69370d80..3b63613419d 100644
            — a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
            +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
            @@ -18,6 +18,8 @@ import java.util
            import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer, ClusterAction, Create, Delete, Deny, Describe, Group, Operation, PermissionType, SimpleAclAuthorizer, Topic, Acl => AuthAcl, Resource => AuthResource}
            import kafka.server.KafkaConfig
            import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
            +import kafka.utils.TestUtils._
            +
            import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions}
            import org.apache.kafka.common.acl._
            import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException}
            diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
            index db451961337..7b68cc0a016 100755
            — a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
            +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
            @@ -25,7 +25,7 @@ import java.nio.file.{Files, StandardOpenOption}
            import java.security.cert.X509Certificate
            import java.time.Duration
            import java.util.{Collections, Properties}
            -import java.util.concurrent.{Callable, Executors, TimeUnit}
            +import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}

            import javax.net.ssl.X509TrustManager
            import kafka.api._
            @@ -41,7 +41,7 @@ import org.apache.kafka.clients.CommonClientConfigs
            import org.apache.kafka.clients.admin.{AdminClient, AlterConfigsResult, Config, ConfigEntry}
            import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata, RangeAssignor}
            import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

            -import org.apache.kafka.common.TopicPartition
            +import org.apache.kafka.common.

            {KafkaFuture, TopicPartition}

            import org.apache.kafka.common.config.ConfigResource
            import org.apache.kafka.common.header.Header
            import org.apache.kafka.common.internals.Topic
            @@ -1374,4 +1374,15 @@ object TestUtils extends Logging

            { (out.toString, err.toString) }

            + def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable]): Unit = {
            + try

            { + future.get() + fail("Expected CompletableFuture.get to return an exception") + }

            catch

            { + case e: ExecutionException => + val cause = e.getCause() + assertTrue("Expected an exception of type " + clazz.getName + "; got type " + + cause.getClass().getName, clazz.isInstance(cause)) + }

            + }
            }
            diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
            index 249e2c3cffd..64b23cb799c 100644
            — a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
            +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
            @@ -17,12 +17,12 @@
            package org.apache.kafka.streams.integration;

            import org.apache.kafka.clients.CommonClientConfigs;
            -import org.apache.kafka.clients.admin.KafkaAdminClient;
            +import org.apache.kafka.clients.admin.AdminClient;
            +import org.apache.kafka.clients.admin.ConsumerGroupDescription;
            import org.apache.kafka.clients.consumer.ConsumerConfig;
            import org.apache.kafka.clients.producer.ProducerConfig;
            import org.apache.kafka.common.config.SslConfigs;
            import org.apache.kafka.common.config.types.Password;
            -import org.apache.kafka.common.errors.TimeoutException;
            import org.apache.kafka.common.serialization.LongDeserializer;
            import org.apache.kafka.common.serialization.LongSerializer;
            import org.apache.kafka.common.serialization.Serdes;
            @@ -61,9 +61,9 @@
            import java.util.List;
            import java.util.Map;
            import java.util.Properties;
            +import java.util.concurrent.ExecutionException;
            import java.util.concurrent.TimeUnit;

            -import kafka.admin.AdminClient;
            import kafka.tools.StreamsResetter;

            import static org.hamcrest.CoreMatchers.equalTo;
            @@ -77,20 +77,15 @@
            private static MockTime mockTime;
            private static KafkaStreams streams;
            private static AdminClient adminClient = null;

            • private static KafkaAdminClient kafkaAdminClient = null;

            abstract Map<String, Object> getClientSslConfig();

            @AfterClass
            public static void afterClassCleanup() {
            if (adminClient != null)

            { - adminClient.close(); + adminClient.close(10, TimeUnit.SECONDS); adminClient = null; }
            • if (kafkaAdminClient != null) { - kafkaAdminClient.close(10, TimeUnit.SECONDS); - kafkaAdminClient = null; - }

              }

            private String appID = "abstract-reset-integration-test";
            @@ -103,9 +98,6 @@ private void prepareEnvironment() {
            if (adminClient == null)

            { adminClient = AdminClient.create(commonClientConfig); }
            • if (kafkaAdminClient == null) { - kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig); - }

            boolean timeSet = false;
            while (!timeSet) {
            @@ -184,8 +176,9 @@ private void prepareConfigs() {
            @Override
            public boolean conditionMet() {
            try

            { - return adminClient.describeConsumerGroup(appID, 0).consumers().get().isEmpty(); - }

            catch (final TimeoutException e)

            { + ConsumerGroupDescription groupDescription = adminClient.describeConsumerGroups(Collections.singletonList(appID)).describedGroups().get(appID).get(); + return groupDescription.members().isEmpty(); + }

            catch (final ExecutionException | InterruptedException e)

            { return false; }

            }

            ----------------------------------------------------------------
            This is an automated message from the Apache Git Service.
            To respond to the message, please log on GitHub and use the
            URL above to go to the specific comment.

            For queries about this service, please contact Infrastructure at:
            users@infra.apache.org

            githubbot ASF GitHub Bot added a comment - hachikuji closed pull request #5278: KAFKA-7091 : AdminClient should handle FindCoordinatorResponse errors URL: https://github.com/apache/kafka/pull/5278 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 62b6b6ee752..7e245d1d1d7 100644 — a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2395,16 +2395,9 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<Stri @Override void handleResponse(AbstractResponse abstractResponse) { final FindCoordinatorResponse fcResponse = (FindCoordinatorResponse) abstractResponse; Errors error = fcResponse.error(); if (error == Errors.COORDINATOR_NOT_AVAILABLE) { - // Retry COORDINATOR_NOT_AVAILABLE, in case the error is temporary. - throw error.exception(); - } else if (error != Errors.NONE) { - // All other errors are immediate failures. - KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId); - future.completeExceptionally(error.exception()); + + if (handleFindCoordinatorError(fcResponse, futures.get(groupId))) return; - } final long nowDescribeConsumerGroups = time.milliseconds(); final int nodeId = fcResponse.node().id(); @@ -2476,6 +2469,17 @@ void handleFailure(Throwable throwable) { return new DescribeConsumerGroupsResult(new HashMap<String, KafkaFuture<ConsumerGroupDescription>>(futures)); } + private boolean handleFindCoordinatorError(FindCoordinatorResponse response, KafkaFutureImpl<?> future) { + Errors error = response.error(); + if (error.exception() instanceof RetriableException) { + throw error.exception(); + } else if (response.hasError()) { + future.completeExceptionally(error.exception()); + return true; + } + return false; + } + private final static class ListConsumerGroupsResults { private final List<Throwable> errors; private final HashMap<String, ConsumerGroupListing> listings; @@ -2610,6 +2614,9 @@ public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String grou void handleResponse(AbstractResponse abstractResponse) { final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; + if (handleFindCoordinatorError(response, groupOffsetListingFuture)) + return; + final long nowListConsumerGroupOffsets = time.milliseconds(); final int nodeId = response.node().id(); @@ -2696,6 +2703,9 @@ public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupI void handleResponse(AbstractResponse abstractResponse) { final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; + if (handleFindCoordinatorError(response, futures.get(groupId))) + return; + final long nowDeleteConsumerGroups = time.milliseconds(); final int nodeId = response.node().id(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java index 39726dac6b9..bc7f654c0bb 100644 — a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java @@ -68,9 +68,11 @@ /** Possible error codes: * + * COORDINATOR_LOAD_IN_PROGRESS (14) COORDINATOR_NOT_AVAILABLE (15) * NOT_COORDINATOR (16) GROUP_AUTHORIZATION_FAILED (30) + * INVALID_REQUEST (42) + * TRANSACTIONAL_ID_AUTHORIZATION_FAILED (53) */ @@ -107,6 +109,10 @@ public int throttleTimeMs() { return throttleTimeMs; } + public boolean hasError() { + return this.error != Errors.NONE; + } + public Errors error() { return error; } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 3566f834220..836307902f4 100644 — a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; +import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.NotLeaderForPartitionException; @@ -1071,6 +1072,10 @@ public void testDeleteConsumerGroups() throws Exception { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().setNode(env.cluster().controller()); + //Retriable FindCoordinatorResponse errors should be retried + env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); + env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); + env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller())); final Map<String, Errors> response = new HashMap<>(); @@ -1081,6 +1086,13 @@ public void testDeleteConsumerGroups() throws Exception { final KafkaFuture<Void> results = result.deletedGroups().get("group-0"); assertNull(results.get()); + + //should throw error for non-retriable errors + env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode())); + + final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds); + assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class); + } } diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index fe98fda8785..9055e68deed 100644 — a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -29,12 +29,13 @@ import kafka.log.LogConfig import kafka.server. {Defaults, KafkaConfig, KafkaServer} import org.apache.kafka.clients.admin._ import kafka.utils. {Logging, TestUtils} +import kafka.utils.TestUtils._ import kafka.utils.Implicits._ import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.consumer. {ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common. {ConsumerGroupState, KafkaFuture, TopicPartition, TopicPartitionReplica} +import org.apache.kafka.common. {ConsumerGroupState, TopicPartition, TopicPartitionReplica} import org.apache.kafka.common.acl._ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors._ @@ -125,18 +126,6 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { }, "timed out waiting for topics") } def assertFutureExceptionTypeEquals(future: KafkaFuture [_] , clazz: Class [_ <: Throwable] ): Unit = { try { - future.get() - fail("Expected CompletableFuture.get to return an exception") - } catch { - case e: ExecutionException => - val cause = e.getCause() - assertTrue("Expected an exception of type " + clazz.getName + "; got type " + - cause.getClass().getName, clazz.isInstance(cause)) - } } - @Test def testClose(): Unit = { val client = AdminClient.create(createConfig()) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index a21affcdc99..72b3b24b8c0 100644 a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -19,7 +19,6 @@ import java.util.regex.Pattern import java.util. {ArrayList, Collections, Properties} import java.time.Duration -import kafka.admin.AdminClient import kafka.admin.ConsumerGroupCommand. {ConsumerGroupCommandOptions, ConsumerGroupService} import kafka.common.TopicAndPartition import kafka.log.LogConfig @@ -27,7 +26,7 @@ import kafka.network.SocketServer import kafka.security.auth._ import kafka.server. {BaseRequestTest, KafkaConfig} import kafka.utils.TestUtils -import org.apache.kafka.clients.admin.NewPartitions +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewPartitions} import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.clients.producer._ @@ -1004,17 +1003,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest { this.consumers.head.partitionsFor(topic) } - @Test(expected = classOf [GroupAuthorizationException] ) + @Test def testDescribeGroupApiWithNoGroupAcl() { addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource) - createAdminClient().describeConsumerGroup(group) + val result = createAdminClient().describeConsumerGroups(Seq(group).asJava) + TestUtils.assertFutureExceptionTypeEquals(result.describedGroups().get(group), classOf[GroupAuthorizationException]) } @Test def testDescribeGroupApiWithGroupDescribe() { addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), groupResource) addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource) - createAdminClient().describeConsumerGroup(group) + createAdminClient().describeConsumerGroups(Seq(group).asJava).describedGroups().get(group).get() } @Test @@ -1036,8 +1036,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), groupResource) this.consumers.head.assign(List(tp).asJava) this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava) - val result = createAdminClient().deleteConsumerGroups(List(group)) - assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NONE)) + createAdminClient().deleteConsumerGroups(Seq(group).asJava).deletedGroups().get(group).get() } @Test @@ -1046,14 +1045,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource) this.consumers.head.assign(List(tp).asJava) this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava) - val result = createAdminClient().deleteConsumerGroups(List(group)) - assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED)) + val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava) + TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group), classOf[GroupAuthorizationException]) } @Test def testDeleteGroupApiWithNoDeleteGroupAcl2() { - val result = createAdminClient().deleteConsumerGroups(List(group)) - assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED)) + val result = createAdminClient().deleteConsumerGroups(Seq(group).asJava) + TestUtils.assertFutureExceptionTypeEquals(result.deletedGroups().get(group), classOf[GroupAuthorizationException]) } @Test @@ -1462,7 +1461,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } private def createAdminClient(): AdminClient = { - val adminClient = AdminClient.createSimplePlaintext(brokerList) + val props = new Properties() + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + val adminClient = AdminClient.create(props) adminClients += adminClient adminClient } diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index f200cc2d55c..07cbf0cf414 100644 — a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -14,28 +14,35 @@ package kafka.api import java.util.concurrent._ -import java.util.{Collection, Collections} +import java.util.{Collection, Collections, Properties} -import kafka.admin.AdminClient -import kafka.server.KafkaConfig +import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.utils. {CoreUtils, Logging, ShutdownableThread, TestUtils} import org.apache.kafka.clients.consumer._ -import org.apache.kafka.clients.producer. {ProducerConfig, ProducerRecord} +import org.apache.kafka.clients.producer. {KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.{FindCoordinatorRequest, FindCoordinatorResponse} +import org.apache.kafka.common.security.auth.SecurityProtocol import org.junit.Assert._ import org.junit.{After, Before, Ignore, Test} import scala.collection.JavaConverters._ +import scala.collection.mutable.Buffer /** * Integration tests for the consumer that cover basic usage as well as server failures */ -class ConsumerBounceTest extends IntegrationTestHarness with Logging { +class ConsumerBounceTest extends BaseRequestTest with Logging { + + override def numBrokers: Int = 3 val producerCount = 1 val consumerCount = 2 - val serverCount = 3 + + val consumers = Buffer[KafkaConsumer[Array [Byte] , Array [Byte] ]]() + val producers = Buffer[KafkaProducer[Array [Byte] , Array [Byte] ]]() val topic = "topic" val part = 0 @@ -45,13 +52,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { val gracefulCloseTimeMs = 1000 val executor = Executors.newScheduledThreadPool(2) - // configure the servers and clients - this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset - this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") - this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout - this.serverConfig.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") - this.serverConfig.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, "true") - this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false") + val producerConfig = new Properties + val consumerConfig = new Properties this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString) @@ -59,8 +61,19 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000") this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + def serverConfig(): Properties = { + val properties = new Properties + properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset + properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") + properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout + properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") + properties.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true") + properties.put(KafkaConfig.AutoCreateTopicsEnableProp, "false") + properties + } + override def generateConfigs = { - FixedPortTestUtils.createBrokerConfigs(serverCount, zkConnect, enableControlledShutdown = false) + FixedPortTestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false) .map(KafkaConfig.fromProps(_, serverConfig)) } @@ -68,8 +81,26 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { override def setUp() { super.setUp() + for (_ <- 0 until producerCount) + producers += createProducer + + for (_ <- 0 until consumerCount) + consumers += createConsumer + // create the test topic with all the brokers as replicas - createTopic(topic, 1, serverCount) + createTopic(topic, 1, numBrokers) + } + + def createProducer: KafkaProducer[Array [Byte] , Array [Byte] ] = { + TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), + securityProtocol = SecurityProtocol.PLAINTEXT, + props = Some(producerConfig)) + } + + def createConsumer: KafkaConsumer[Array [Byte] , Array [Byte] ] = { + TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers), + securityProtocol = SecurityProtocol.PLAINTEXT, + props = Some(consumerConfig)) } @After @@ -78,6 +109,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { executor.shutdownNow() // Wait for any active tasks to terminate to ensure consumer is not closed while being used from another thread assertTrue("Executor did not terminate", executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) + producers.foreach(_.close()) + consumers.foreach(_.close()) } finally { super.tearDown() } @@ -173,7 +206,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { val consumer = this.consumers.head consumer.subscribe(Collections.singleton(newtopic)) executor.schedule(new Runnable { - def run() = createTopic(newtopic, numPartitions = serverCount, replicationFactor = serverCount) + def run() = createTopic(newtopic, numPartitions = numBrokers, replicationFactor = numBrokers) }, 2, TimeUnit.SECONDS) consumer.poll(0) @@ -243,9 +276,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { val consumer1 = createConsumerAndReceive(dynamicGroup, false, numRecords) val consumer2 = createConsumerAndReceive(manualGroup, true, numRecords) - val adminClient = AdminClient.createSimplePlaintext(this.brokerList) - killBroker(adminClient.findCoordinator(dynamicGroup).id) - killBroker(adminClient.findCoordinator(manualGroup).id) + killBroker(findCoordinator(dynamicGroup)) + killBroker(findCoordinator(manualGroup)) val future1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs)) val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(gracefulCloseTimeMs)) @@ -255,9 +287,16 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { restartDeadBrokers() checkClosedState(dynamicGroup, 0) checkClosedState(manualGroup, numRecords) - adminClient.close() } + private def findCoordinator(group: String) : Int = { + val request = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, group).build() + val resp = connectAndSend(request, ApiKeys.FIND_COORDINATOR) + val response = FindCoordinatorResponse.parse(resp, ApiKeys.FIND_COORDINATOR.latestVersion()) + response.node().id() + } + + /** * Consumer is closed while all brokers are unavailable. Cannot rebalance or commit offsets since * there is no coordinator, but close should timeout and return. If close is invoked with a very @@ -288,7 +327,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { @Test def testCloseDuringRebalance() { val topic = "closetest" - createTopic(topic, 10, serverCount) + createTopic(topic, 10, numBrokers) this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000") this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") @@ -355,7 +394,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { private def createConsumer(groupId: String) : KafkaConsumer[Array [Byte] , Array [Byte] ] = { this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) - val consumer = super.createConsumer + val consumer = createConsumer consumers += consumer consumer } diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala index 9da69370d80..3b63613419d 100644 — a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala @@ -18,6 +18,8 @@ import java.util import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer, ClusterAction, Create, Delete, Deny, Describe, Group, Operation, PermissionType, SimpleAclAuthorizer, Topic, Acl => AuthAcl, Resource => AuthResource} import kafka.server.KafkaConfig import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils} +import kafka.utils.TestUtils._ + import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions} import org.apache.kafka.common.acl._ import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index db451961337..7b68cc0a016 100755 — a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -25,7 +25,7 @@ import java.nio.file.{Files, StandardOpenOption} import java.security.cert.X509Certificate import java.time.Duration import java.util.{Collections, Properties} -import java.util.concurrent.{Callable, Executors, TimeUnit} +import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit} import javax.net.ssl.X509TrustManager import kafka.api._ @@ -41,7 +41,7 @@ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.{AdminClient, AlterConfigsResult, Config, ConfigEntry} import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata, RangeAssignor} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common. {KafkaFuture, TopicPartition} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.header.Header import org.apache.kafka.common.internals.Topic @@ -1374,4 +1374,15 @@ object TestUtils extends Logging { (out.toString, err.toString) } + def assertFutureExceptionTypeEquals(future: KafkaFuture [_] , clazz: Class [_ <: Throwable] ): Unit = { + try { + future.get() + fail("Expected CompletableFuture.get to return an exception") + } catch { + case e: ExecutionException => + val cause = e.getCause() + assertTrue("Expected an exception of type " + clazz.getName + "; got type " + + cause.getClass().getName, clazz.isInstance(cause)) + } + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java index 249e2c3cffd..64b23cb799c 100644 — a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java @@ -17,12 +17,12 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.types.Password; -import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; @@ -61,9 +61,9 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import kafka.admin.AdminClient; import kafka.tools.StreamsResetter; import static org.hamcrest.CoreMatchers.equalTo; @@ -77,20 +77,15 @@ private static MockTime mockTime; private static KafkaStreams streams; private static AdminClient adminClient = null; private static KafkaAdminClient kafkaAdminClient = null; abstract Map<String, Object> getClientSslConfig(); @AfterClass public static void afterClassCleanup() { if (adminClient != null) { - adminClient.close(); + adminClient.close(10, TimeUnit.SECONDS); adminClient = null; } if (kafkaAdminClient != null) { - kafkaAdminClient.close(10, TimeUnit.SECONDS); - kafkaAdminClient = null; - } } private String appID = "abstract-reset-integration-test"; @@ -103,9 +98,6 @@ private void prepareEnvironment() { if (adminClient == null) { adminClient = AdminClient.create(commonClientConfig); } if (kafkaAdminClient == null) { - kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig); - } boolean timeSet = false; while (!timeSet) { @@ -184,8 +176,9 @@ private void prepareConfigs() { @Override public boolean conditionMet() { try { - return adminClient.describeConsumerGroup(appID, 0).consumers().get().isEmpty(); - } catch (final TimeoutException e) { + ConsumerGroupDescription groupDescription = adminClient.describeConsumerGroups(Collections.singletonList(appID)).describedGroups().get(appID).get(); + return groupDescription.members().isEmpty(); + } catch (final ExecutionException | InterruptedException e) { return false; } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org

            People

              omkreddy Manikumar
              omkreddy Manikumar
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: