Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-5651 KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage engines
  3. KAFKA-5650

Provide a simple way for custom storage engines to use streams wrapped stores (KIP-182)

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 1.0.0
    • None
    • None

    Description

      As per KIP-182:
      A new interface will be added:

      /**
       * Implementations of this will provide the ability to wrap a given StateStore
       * with or without caching/loggging etc.
       */
      public interface StateStoreBuilder<T extends StateStore> {
       
          StateStoreBuilder<T> withCachingEnabled();
          StateStoreBuilder<T> withCachingDisabled();
          StateStoreBuilder<T> withLoggingEnabled(Map<String, String> config);
          StateStoreBuilder<T> withLoggingDisabled();
          T build();
      }
      

      This interface will be used to wrap stores with caching, logging etc.
      Additionally some convenience methods on the Stores class:

      public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> persistentKeyValueStore(final String name,
                                                                                           final Serde<K> keySerde,
                                                                                           final Serde<V> valueSerde)
       
      public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> inMemoryKeyValueStore(final String name,
                                                                                      final Serde<K> keySerde,
                                                                                      final Serde<V> valueSerde)
       
      public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> lruMap(final String name,
                                                                      final int capacity,
                                                                      final Serde<K> keySerde,
                                                                      final Serde<V> valueSerde)
       
      public static <K, V> StateStoreSupplier<WindowStore<K, V>> persistentWindowStore(final String name,
                                                                                  final Windows windows,
                                                                                  final Serde<K> keySerde,
                                                                                  final Serde<V> valueSerde)
       
      public static <K, V> StateStoreSupplier<SessionStore<K, V>> persistentSessionStore(final String name,
                                                                                    final SessionWindows windows,
                                                                                    final Serde<K> keySerde,
                                                                                    final Serde<V> valueSerde)
       
      /**
       *  The following methods are for use with the PAPI. They allow building of StateStores that can be wrapped with
       *  caching, logging, and any other convenient wrappers provided by the KafkaStreams library
       */ 
      public <K, V> StateStoreBuilder<WindowStore<K, V>> windowStoreBuilder(final StateStoreSupplier<WindowStore<K, V>> supplier)
       
      public <K, V> StateStoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final StateStoreSupplier<KeyValueStore<K, V>> supplier)
       
      public <K, V> StateStoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final StateStoreSupplier<SessionStore<K, V>> supplier)
      

      Attachments

        Issue Links

          Activity

            People

              damianguy Damian Guy
              damianguy Damian Guy
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: