Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-3207

StateStore seems to be writing state to one topic but restoring from another

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.10.0.0
    • Fix Version/s: 0.10.0.0
    • Component/s: streams
    • Labels:
      None
    • Environment:
      MacOS El Capitan

      Description

      The state store (I am using in-memory state store) writes to a topic call [store-name] but restores from [job-id]-[store-name]-changelog. You can see in StoreChangeLogger that it writes to a topic which is the [store-name] passed through from the store supplier factory, but restores from the above topic name. My topology is:
      TopologyBuilder builder = new TopologyBuilder();

      SerializerAdapter<CommonKey> commonKeyAdapter = new SerializerAdapter<>(JDKBinarySerializer.INSTANCE);
      SerializerAdapter<GamePlayValue> gamePlayAdapter = new SerializerAdapter<>(JDKBinarySerializer.INSTANCE);
      builder.addSource("SOURCE", commonKeyAdapter, gamePlayAdapter, kafkaStreamConfig.getGamePlayTopic());

      Duration activityInterval = kafkaStreamConfig.getActivityInterval();
      if (activityInterval.toMinutes() % 5 != 0 || 24 * 60 % activityInterval.toMinutes() != 0)

      { throw new SystemFaultException( "The game activity interval must be a multiple of 5 minutes and divide into 24 hours current value [" + activityInterval.toMinutes() + "]"); }

      builder.addProcessor("PROCESS", new GameActivitySupplier(kafkaStreamConfig.getStoreName(),
      kafkaStreamConfig.getGameActivitySendPeriod(),
      activityInterval,
      kafkaStreamConfig.getRemoveOldestTime(),
      kafkaStreamConfig.getRemoveAbsoluteTime()), "SOURCE");

      SerializerAdapter<StoreValue> storeValueAdapter = new SerializerAdapter<>(JDKBinarySerializer.INSTANCE);
      builder.addStateStore(
      Stores.create(kafkaStreamConfig.getStoreName()).withKeys(commonKeyAdapter, commonKeyAdapter).withValues(
      storeValueAdapter, storeValueAdapter).inMemory().build(), "PROCESS");

      builder.addSink("SINK", kafkaStreamConfig.getGameActivityTopic(), commonKeyAdapter,
      new SerializerAdapter<GameActivityTotalMessage>(JDKBinarySerializer.INSTANCE), "PROCESS");

        Attachments

          Activity

            People

            • Assignee:
              guozhang Guozhang Wang
              Reporter:
              tom_dearman Tom Dearman
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: