Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8602

StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.3.0, 2.2.1
    • Fix Version/s: 1.0.3, 1.1.2, 2.0.2, 2.1.2, 2.2.2, 2.4.0, 2.3.1
    • Component/s: streams
    • Labels:
      None

      Description

      StreamThread dies with the following exception:

      java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
      	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199)
      	at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
      	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
      	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
      	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
      

      The reason is that the restore consumer is not subscribed to any topic. This happens when a StreamThread gets assigned standby tasks for sub-topologies with just state stores with disabled logging.

      To reproduce the bug start two applications with one StreamThread and one standby replica each and the following topology. The input topic should have two partitions:

      final StreamsBuilder builder = new StreamsBuilder();
      final String stateStoreName = "myTransformState";
      final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
          Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
              Serdes.Integer(),
              Serdes.Integer())
              .withLoggingDisabled();
      builder.addStateStore(keyValueStoreBuilder);
      builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()))
          .transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
              private KeyValueStore<Integer, Integer> state;
      
                  @SuppressWarnings("unchecked")
                  @Override
                  public void init(final ProcessorContext context) {
                      state = (KeyValueStore<Integer, Integer>) context.getStateStore(stateStoreName);
                  }
      
                  @Override
                  public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
                      final KeyValue<Integer, Integer> result = new KeyValue<>(key, value);
                      return result;
                  }
      
                  @Override
                  public void close() {}
              }, stateStoreName)
              .to(OUTPUT_TOPIC);
      

      Both StreamThreads should die with the above exception.

      The root cause is that standby tasks are created although all state stores of the sub-topology have logging disabled.

        Attachments

          Activity

            People

            • Assignee:
              cadonna Bruno Cadonna
              Reporter:
              cadonna Bruno Cadonna
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: