Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9005

Kafka stream: “TopicAuthorizationException: Not authorized to access topics” for an internal state store

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Not A Bug
    • 2.3.0
    • None
    • streams
    • None

    Description

      Java: OpenJdk 11
      Java: OpenJdk 11
      Kafka server: 2.2.0
      Kafka streams lib: 2.3.0

      i have created a stackoverflow  query [here|https://stackoverflow.com/questions/58299827/kafka-stream-topicauthorizationexception-not-authorized-to-access-topics-for]

       

      I am trying to deploy my Kafka streams application in a docker container and it fails while trying to create an internal state store with a TopicAuthorizationException.It works well locally. The main difference between locally and on the server is that there it connects to a server deployed Kafka and authenticates using the usual Kerberos auth.I fail to understand the link between authentication and the local stores.

      My stream looks like that:

      StreamsBuilder builder = new StreamsBuilder();
      
              //We stream from the source topic
              KStream<String, EnrichedMessagePayload> sourceMessagesStream = builder.stream(sourceTopic, Consumed
                      .with(Serdes.serdeFrom(String.class), INPUT_SERDE));
      
              //We group per room and window
              TimeWindowedKStream<String, EnrichedMessagePayload> windowed = sourceMessagesStream
                      .groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(windowSize)).grace(Duration.ZERO));
      
              //We make them a list
              KStream<Windowed<String>, WindowedMessages> grouped = windowed
                      .aggregate(WindowedMessages::new,
                              (key, value, aggregate) -> aggregate.add(value),
                              Materialized.with(Serdes.String(), Serdes.serdeFrom(windowSerializer, windowSerializer)))
                      .suppress(Suppressed.untilWindowCloses(unbounded()))
                      .toStream();
      
              //Filter
              KStream<Windowed<String>, FilterResult> filtered = grouped
                      .mapValues((readOnlyKey, value) -> filterWindow(value.getMessages()));
      
              //Re map to its original form
              KStream<String, OutputPayload> reduced = filtered
                      .flatMap((KeyValueMapper<Windowed<String>, WindowedMessages, Iterable<KeyValue<String, OutputPayload>>>) (key, value) -> value
                              .getMessages()
                              .stream().map(payload -> new KeyValue<>(key.key(), payload))
                              .collect(toList()));
      
      
              //Target topic
              reduced.to(sinkTopic, Produced
                      .with(Serdes.serdeFrom(String.class), SERDE));
      
              return builder.build();
      

      It receives a stream of messages, windows it, aggregates all the messages per window, keeps only the last version of the list with a 'Suppressed' and then flatMaps the whole to forward it to another topic.

      Every time i get that kind of exception:

       

      org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]> Error message was: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]2019-10-09 06:44:03.255 +0000 ERROR [filterer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1] [StreamThread.java:777] - stream-thread [filterer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: - [rapid_r-live-message-filterer-0-0-1-snapshot-10.1e842f1a-ea60-11e9-9c7d-024298932744] - [] - []org.apache.kafka.streams.errors.StreamsException: Could not create topic filterer-KTABLE-SUPPRESS-STATE-STORE-0000000005-changelog. at org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:212) at org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:226) at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:104) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:971) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:618) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:424) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:622) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:107) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:544) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:527) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:978) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:958) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:578) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:415) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:846) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]

      Attachments

        Activity

          People

            Unassigned Unassigned
            arnaud.villevieille Arnaud Villevieille
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: