Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12735

"Key Store is not initilalized" after some time for a Keystore created by a transform Values processors



    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • 2.6.2
    • None
    • clients


      Dear Kafka Fellows,

      currently, we are facing problems with Kafka Streams.
      We try to transform a set of messages into a state store.

      The functionality is working, but after a certain period the application returns the error
      Key value store is not initialzed.

      We tried alot of solutions, like using the Kafka events or loops to wait until the store is available again. But the system is not able to healh again.

      Colleagues of us use the Kubernetes Health check to restart the application when this issues comes up. But we think this is not a proper solution.

      What are you recommending?



      Thanks a lot for your help


      Our Code


      @Cacheable(value = MYICP_NOTIFICATIONS, key = "#emailAddress", unless = "#result == null || #result.cachedObject == null || #result.cachedObject.isEmpty()")
      public GenericCacheable<List<MyIcpNotification>> getMyIcpNotificationsForUser(final String uuid, final String emailAddress) throws InterruptedException {
          if (!hasText(emailAddress)) {
              LOGGER.error("[{}]: getMyIcpNotificationsForUser was called with an invalid email address.", uuid);
              return new GenericCacheable<>(Collections.emptyList(), null);
          if (keyValueStore == null) {
          if (keyValueStore == null) {
              LOGGER.error("[{}]: Key value store is not initialized.", uuid);
              return new GenericCacheable<>(Collections.emptyList(), null);
          final List<Command<MyIcpPayload>> commandList = keyValueStore.get(emailAddress);
          if (commandList == null) {
              return new GenericCacheable<>(Collections.emptyList(), null);
          final List<MyIcpNotification> list = commandList
          return new GenericCacheable<>(list, LocalDateTime.now());
      private void initializeStore(final String uuid) throws InterruptedException {
          int counter = 0;
          while (counter < 5) {
              try {
                  keyValueStore = myIcpMessagesStream.store(storeName, QueryableStoreTypes.keyValueStore());
              } catch (final Exception e) {
                  LOGGER.debug("[{}]: Error while loading the state store [{}]", uuid, e.getMessage());


      public KafkaStreams myIcpMessagesStream(@Qualifier("myIcpEvents") final StreamsBuilderFactoryBean streamsBuilderFactoryBean) throws Exception {
          final StreamsBuilder myicpQueryStreamBuilder = Objects.requireNonNull(streamsBuilderFactoryBean.getObject());
          final StoreBuilder<KeyValueStore<String, List<Command<MyIcpPayload>>>> keyValueStoreBuilder = keyValueStoreBuilder(inMemoryKeyValueStore(storeName), Serdes.String(), new CommandListSerde<>());
                  .stream(kafkaTopicNames.getMyIcpMessageTopic(), Consumed.with(Serdes.String(), new CommandSerde<>()))
                  .transformValues(() -> commandTransformer, storeName);
          final KafkaStreams kafkaStreams = new KafkaStreams(myicpQueryStreamBuilder.build(), Objects.requireNonNull(streamsBuilderFactoryBean.getStreamsConfiguration()));
          return kafkaStreams;



      public class CommandTransformer implements ValueTransformer<Command<MyIcpPayload>, List<Command<MyIcpPayload>>> {
          private static final Logger LOGGER = LoggerFactory.getLogger(CommandTransformer.class);
          private String storeName;
          private int maxStoreSize;
          private KeyValueStore<String, List<Command<MyIcpPayload>>> keyValueStore;
          public void init(final ProcessorContext context) {
              keyValueStore = (KeyValueStore<String, List<Command<MyIcpPayload>>>) context.getStateStore(storeName);
          @CacheEvict(value = MYICP_NOTIFICATIONS, key = "#value.payload.user.emailAddress")
          public List<Command<MyIcpPayload>> transform(final Command<MyIcpPayload> value) {
              if (value == null) {
                  return Collections.emptyList();
              final List<Command<MyIcpPayload>> listForUser = getCommandListForUser(value);
              if (isInvalidValue(value, listForUser)) {
                  return listForUser;
              if (listForUser.size() >= maxStoreSize) {
              LOGGER.debug("[{}] current list [{}]", value.getPayload().getUser().getEmailAddress(), listForUser.size());
              keyValueStore.put(value.getPayload().getUser().getEmailAddress(), listForUser);
              LOGGER.debug("[{}] list after update [{}]", value.getPayload().getUser().getEmailAddress(), listForUser.size());
              return listForUser;
          private boolean isInvalidValue(final Command<MyIcpPayload> value, final List<Command<MyIcpPayload>> listForUser) {
              if (uuidAlreadyPresent(value, listForUser)) {
                  return true;
              final ZonedDateTime oldestDate = getOldestDateInList(listForUser);
              return nonNull(oldestDate) && oldestDate.isAfter(value.getHeader().getTimestamp());
          private ZonedDateTime getOldestDateInList(final List<Command<MyIcpPayload>> listForUser) {
              return listForUser
                      .map(myIcpPayloadCommand -> myIcpPayloadCommand.getHeader().getTimestamp())
          private boolean uuidAlreadyPresent(final Command<MyIcpPayload> value, final List<Command<MyIcpPayload>> listForUser) {
              return listForUser.stream().anyMatch(myIcpPayloadCommand -> myIcpPayloadCommand.getHeader().getUuid().equalsIgnoreCase(value.getHeader().getUuid()));
          private List<Command<MyIcpPayload>> getCommandListForUser(final Command<MyIcpPayload> value) {
              List<Command<MyIcpPayload>> listForUser = keyValueStore.get(value.getPayload().getUser().getEmailAddress());
              if (isNull(listForUser)) {
                  listForUser = new ArrayList<>();
              return listForUser;
          public void close() {
              // do nothing here





            Unassigned Unassigned
            thein Thomas Hein
            0 Vote for this issue
            2 Start watching this issue