Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12735

"Key Store is not initilalized" after some time for a Keystore created by a transform Values processors

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • 2.6.2
    • None
    • clients

    Description

      Dear Kafka Fellows,

      currently, we are facing problems with Kafka Streams.
      We try to transform a set of messages into a state store.

      The functionality is working, but after a certain period the application returns the error
      Key value store is not initialzed.

      We tried alot of solutions, like using the Kafka events or loops to wait until the store is available again. But the system is not able to healh again.

      Colleagues of us use the Kubernetes Health check to restart the application when this issues comes up. But we think this is not a proper solution.

      What are you recommending?

       

       

      Thanks a lot for your help

       

      Our Code

       

      @Cacheable(value = MYICP_NOTIFICATIONS, key = "#emailAddress", unless = "#result == null || #result.cachedObject == null || #result.cachedObject.isEmpty()")
      public GenericCacheable<List<MyIcpNotification>> getMyIcpNotificationsForUser(final String uuid, final String emailAddress) throws InterruptedException {
          if (!hasText(emailAddress)) {
              LOGGER.error("[{}]: getMyIcpNotificationsForUser was called with an invalid email address.", uuid);
              return new GenericCacheable<>(Collections.emptyList(), null);
          }
      
          if (keyValueStore == null) {
              initializeStore(uuid);
          }
      
          if (keyValueStore == null) {
              LOGGER.error("[{}]: Key value store is not initialized.", uuid);
              return new GenericCacheable<>(Collections.emptyList(), null);
          }
      
          final List<Command<MyIcpPayload>> commandList = keyValueStore.get(emailAddress);
          if (commandList == null) {
              return new GenericCacheable<>(Collections.emptyList(), null);
          }
      
          //@formatter:off
          final List<MyIcpNotification> list = commandList
                  .stream()
                  .map(this::mapToNotification)
                  .collect(Collectors.toList());
          //@formatter:on
      
          return new GenericCacheable<>(list, LocalDateTime.now());
      }
      
      private void initializeStore(final String uuid) throws InterruptedException {
          int counter = 0;
          while (counter < 5) {
              try {
                  keyValueStore = myIcpMessagesStream.store(storeName, QueryableStoreTypes.keyValueStore());
                  return;
              } catch (final Exception e) {
                  LOGGER.debug("[{}]: Error while loading the state store [{}]", uuid, e.getMessage());
                  Thread.sleep(1000);
                  counter++;
              }
          }
      }
      

       

      public KafkaStreams myIcpMessagesStream(@Qualifier("myIcpEvents") final StreamsBuilderFactoryBean streamsBuilderFactoryBean) throws Exception {
          final StreamsBuilder myicpQueryStreamBuilder = Objects.requireNonNull(streamsBuilderFactoryBean.getObject());
      
          final StoreBuilder<KeyValueStore<String, List<Command<MyIcpPayload>>>> keyValueStoreBuilder = keyValueStoreBuilder(inMemoryKeyValueStore(storeName), Serdes.String(), new CommandListSerde<>());
          myicpQueryStreamBuilder.addStateStore(keyValueStoreBuilder);
      
          //@formatter:off
          myicpQueryStreamBuilder
                  .stream(kafkaTopicNames.getMyIcpMessageTopic(), Consumed.with(Serdes.String(), new CommandSerde<>()))
                  .mapValues(this::mapPayloadToMyIcpPayload)
                  .transformValues(() -> commandTransformer, storeName);
          //@formatter:on
      
          final KafkaStreams kafkaStreams = new KafkaStreams(myicpQueryStreamBuilder.build(), Objects.requireNonNull(streamsBuilderFactoryBean.getStreamsConfiguration()));
          kafkaStreams.start();
      
          return kafkaStreams;
      }
      

       

       

      public class CommandTransformer implements ValueTransformer<Command<MyIcpPayload>, List<Command<MyIcpPayload>>> {
          private static final Logger LOGGER = LoggerFactory.getLogger(CommandTransformer.class);
      
          @Value("${ifx.notificationService.myicp.storeName}")
          private String storeName;
      
          @Value("${ifx.notificationService.myicp.maxStoreSize}")
          private int maxStoreSize;
      
          private KeyValueStore<String, List<Command<MyIcpPayload>>> keyValueStore;
      
          @Override
          public void init(final ProcessorContext context) {
              keyValueStore = (KeyValueStore<String, List<Command<MyIcpPayload>>>) context.getStateStore(storeName);
          }
      
          @Override
          @CacheEvict(value = MYICP_NOTIFICATIONS, key = "#value.payload.user.emailAddress")
          public List<Command<MyIcpPayload>> transform(final Command<MyIcpPayload> value) {
              if (value == null) {
                  return Collections.emptyList();
              }
      
              final List<Command<MyIcpPayload>> listForUser = getCommandListForUser(value);
      
              if (isInvalidValue(value, listForUser)) {
                  return listForUser;
              }
      
              if (listForUser.size() >= maxStoreSize) {
                  listForUser.remove(0);
              }
      
              LOGGER.debug("[{}] current list [{}]", value.getPayload().getUser().getEmailAddress(), listForUser.size());
      
              listForUser.add(value);
              keyValueStore.put(value.getPayload().getUser().getEmailAddress(), listForUser);
      
              LOGGER.debug("[{}] list after update [{}]", value.getPayload().getUser().getEmailAddress(), listForUser.size());
      
              return listForUser;
          }
      
          private boolean isInvalidValue(final Command<MyIcpPayload> value, final List<Command<MyIcpPayload>> listForUser) {
              if (uuidAlreadyPresent(value, listForUser)) {
                  return true;
              }
      
              final ZonedDateTime oldestDate = getOldestDateInList(listForUser);
      
              return nonNull(oldestDate) && oldestDate.isAfter(value.getHeader().getTimestamp());
          }
      
          private ZonedDateTime getOldestDateInList(final List<Command<MyIcpPayload>> listForUser) {
              //@formatter:off
              return listForUser
                      .stream()
                      .map(myIcpPayloadCommand -> myIcpPayloadCommand.getHeader().getTimestamp())
                      .max(ZonedDateTime::compareTo)
                      .orElse(null);
              //@formatter:on
          }
      
          private boolean uuidAlreadyPresent(final Command<MyIcpPayload> value, final List<Command<MyIcpPayload>> listForUser) {
              return listForUser.stream().anyMatch(myIcpPayloadCommand -> myIcpPayloadCommand.getHeader().getUuid().equalsIgnoreCase(value.getHeader().getUuid()));
          }
      
          private List<Command<MyIcpPayload>> getCommandListForUser(final Command<MyIcpPayload> value) {
              List<Command<MyIcpPayload>> listForUser = keyValueStore.get(value.getPayload().getUser().getEmailAddress());
              if (isNull(listForUser)) {
                  listForUser = new ArrayList<>();
              }
      
              return listForUser;
          }
      
          @Override
          public void close() {
              // do nothing here
          }
      }
      

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            thein Thomas Hein
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: