Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.2.0
    • Labels:
      None

      Description

      Flink offers state abstractions for user functions in order to guarantee fault-tolerant processing of streams. Users can work with both non-partitioned (Checkpointed interface) and partitioned state (getRuntimeContext().getState(ValueStateDescriptor) and other variants).

      The partitioned state interface provides access to different types of state that are all scoped to the key of the current input element. This type of state can only be used on a KeyedStream, which is created via stream.keyBy().

      Currently, all of this state is internal to Flink and used in order to provide processing guarantees in failure cases (e.g. exactly-once processing).

      The goal of Queryable State is to expose this state outside of Flink by supporting queries against the partitioned key value state.

      This will help to eliminate the need for distributed operations/transactions with external systems such as key-value stores which are often the bottleneck in practice. Exposing the local state to the outside moves a good part of the database work into the stream processor, allowing both high throughput queries and immediate access to the computed state.

      This is the initial design doc for the feature: https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g. Feel free to comment.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user uce opened a pull request:

          https://github.com/apache/flink/pull/2051

          FLINK-3779 Add support for queryable state

          First of all, thanks to @tillrohrmann, @aljoscha, and @StephanEwen for discussions during and before implementing this first version. The initial design document can be found here: https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g

          *In a nutshell, this feature allows users to query Flink's managed partitioned state from outside of Flink. This eliminates the need for distributed operations/transactions with external systems such as key-value stores which are often the bottleneck in practice.*

          1. APIs
            1. QueryableStateStream

          The following methods have been added as `@PublicEvolving` to `KeyedStream`:

          ```java
          // ValueState
          QueryableStateStream asQueryableState(
          String queryableStateName,
          ValueStateDescriptor stateDescriptor)

          // Shortcut for explicit ValueStateDescriptor variant
          QueryableStateStream asQueryableState(String queryableStateName)

          // ListState
          QueryableStateStream asQueryableState(
          String queryableStateName,
          ListStateDescriptor stateDescriptor)

          // FoldingState
          QueryableStateStream asQueryableState(
          String queryableStateName,
          FoldingStateDescriptor stateDescriptor)

          // ReducingState
          QueryableStateStream asQueryableState(
          String queryableStateName,
          ReducingStateDescriptor stateDescriptor)
          ```

          A call to these methods returns a `QueryableStateStream`, which cannot be further transformed and currently only holds the value and key serializer for the queryable state stream. It's comparable to a sink, after which you cannot do further transformations.

          The `QueryableStateStream` gets translated to an operator, which uses all incoming records to update the queryable state instance. If you have a program like `stream.keyBy(0).asQueryableState("query-name")`, all records of the keyed stream will be used to update the state instance, either via `update` for `ValueState` or `add` for `AppendingState`. This acts like the Scala API's `flatMapWithState`. For an example, take a look at `QueryableStateITCase` in `flink-tests`.

          I understand that these are quite a few methods to add to the public APIs, but I am not aware of another way to do it if we want to ensure type safety when providing the state descriptor. @aljoscha, you have quite some experience with designing APIs. Is there maybe a better way? And what do you (and others) think about the name `QueryableStateStream`? We could also go for the shorter `QueryableState` or something else even. I'm open to suggestions.

            1. QueryableStateClient

          This is the client used for queries against the KvState instances. The query method is this:

          ```java
          Future<byte[]> getKvState(
          JobID jobID,
          String queryableStateName,
          int keyHashCode,
          byte[] serializedKeyAndNamespace)
          ```

          A call to the method returns a Future eventually holding the serialized state value for the queryable state instance identified by `queryableStateName` of the job with ID `jobID`. The `keyHashCode` is the hash code as returned by `Object.hashCode()` and the `serializedKeyAndNamespace` is the serialized key and namespace. The client is asynchronous and can be shared by multiple Threads. An example can be seen in `QueryableStateITCase` (in `flink-tests`).

          The current implementation is low-level in the sense that it only works with serialized data both for providing the key/namespace and the returned results. It's the responsibility of the user (or some follow-up utilities) to set up the serializers for this. The nice thing about this is that the query services don't have to get into the business of worrying about any class loading issues etc.

          There are some serialization utils for key/namespace and value serialization included in `KvStateRequestSerializer`.

          1. Implementation

          The following sections highlight the main changes/additions.

            1. Added `setQueryable(String)` to `StateDescriptor`

          KvState instances are published for queries when they have a queryable state name set (see below). For this purpose, I've introduced the `setQueryable(String)` method to the `StateDescriptor` interface. The provided name is different from the state descriptor name we already had before. For queries, only the name provided in `setQueryable(String)` is relevant.

          The name needs to be unique per job. If this is not the case, the job fails at runtime with an unrecoverable exception. Unfortunately, this can not be checked before submitting the job.

            1. Added `byte[] getSerializedValue(byte[] serializedKeyNamespace)` to `KvState`

          This method is implemented by all KvState instances for queries. Since all state instances have references to their serializers, they have to worry about serialization and the caller does not.

          For Java heap-backed state, we deserialize the key and namespace, access the state for the key/namespace, and serialize the result. For RocksDB backed state, we can directly use the `serializedKeyAndNamespace` to access the serialized result.

          Furthermore, with the RocksDB state backend we don't have to worry about concurrent accesses to the state instance whereas we need `ConcurrentHashMap`s for the internal key/namespace maps of `AbstractHeapState` if the state instance is queryable.

            1. Added `KvStateRegistry` to TaskManager

          This is a very simple registry on the TaskManager. The `AbstractStateBackend` registers `KvState` instances at runtime on:

          • first call to `getPartitionedState()`, which creates the `State` instance, or
          • `injectKeyValueStateSnapshots()`.

          At the moment, we essentially have two variants for the state backends: either a RocksDB backed or a Java heap-backed backend (`FileSystemStateBackend`, `MemoryStateBackend`).

          A note on restoring: RocksDB state will only be published for queries on `getPartitionedState()` whereas Java heap-backed state is already published on `injectKeyValueStateSnapshots()`. This has to do with the way that the RocksDB state backend organizes the state internally. I didn't want to change a lot there and I think it's a fair compromise for the first version.

            1. Added `KvStateLocationRegistry` to JobManager

          The `KvStateRegistry` of each TaskManager reports the registered state instances to the JobManager, where they are aggregated by the `KvStateLocationRegistry`. The purpose of this is to allow clients to query the JobManager for location information about the state they want to query. There is one `KvStateLocation` for each registered queryable state, which maps each key group index (currently the sub task index) to the server address holding the state instance.

          With this, the client can figure out which TaskManager to query for each key. Only when the location is unknown or out-of-sync, there needs to be communication between the client and JobManager.

          The lookup of `KvStateLocation` instances happens via Akka.

            1. Added `KvStateClient` and `KvStateServer` for network transfers

          The `KvStateClient` and `KvStateServer` are responsible for the actual data exchange via TCP. Each TaskManager runs a single `KvStateServer`, which queries the local `KvStateRegistry` on incoming requests.

          Connections are established and released by the client. Only on failures, does the server close a connection. Each client connection can be shared by multiple Threads.

          Both client and server keep track of statistics for their respects (how many requests and how long did they take).

          1. Limitations
          • User docs are sparse. I wanted to wait for some initial feedback with this PR before writing anything.
          • The queryable state life-cycle is bound to the life-cycle of the job, e.g. tasks register queryable state on startup and unregister it on dispose. In future versions, it is desirable to decouple this in order to allow queries after a task finishes and to speed up recovery via state replication.
          • Notifications about available `KvState` happen via a simple `tell`. This should be improved to be more robust with `ask`s and acknowledgements. This was held simple on purpose in anticipation of possible state replication improvements (see first point), which probably need a differnt model of reporting available state.
          • The server and client keep track of statistics for queries. These are currently disabled by default as they would not be exposed anywhere. As soon as there is better support to publish these numbers via the Metrics system, we should enable the stats.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/uce/flink 3779-queryable_state

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2051.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2051


          commit c1e8466f09787d229b78019e4d33d1c64932c74e
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2016-05-30T11:42:39Z

          FLINK-3779 [runtime] Add getSerializedValue(byte[]) to KvState

          [statebackend-rocksdb, core, streaming-java]

          • Adds the getSerializedValue(byte[]) to KvState, which is used to query single
            KvState instances. The serialization business is left to the KvState in order
            to not burden the accessor – e.g. the querying network thread – with setting
            up/accessing the serializers.
          • Adds quaryable flag to the StateDescriptor. State, which sets a queryable state
            name will be published for queries to the KvStateRegistry.
          • Prohibts null namespace and enforces VoidNamespace instead. This makes stuff
            more explicit. Furthermore, the concurrent map used for queryable memroy state
            does not allow working with null keys.

          commit cd651a651c247e3638d81c3db32b9619a35aaf2a
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2016-05-30T12:03:35Z

          FLINK-3779 [runtime] Add KvStateRegistry for queryable KvState

          [streaming-java]

          • Adds a KvStateRegistry per TaskManager at which created KvState instances are
            registered/unregistered.
          • Registered KvState instances are reported to the JobManager, whcih can be
            queried for KvStateLocation.

          commit 3405e4a5fc3429f211b5ed0e4d6da394f7d5bb4e
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2016-05-30T12:00:49Z

          FLINK-3779 [runtime] Add KvState network client and server

          • Adds a Netty-based server and client to query KvState instances, which have
            been published to the KvStateRegistry.

          commit bb4baa33aaca8475d46da4c6c0ec3cbecbefa81a
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2016-05-30T12:08:03Z

          FLINK-3779 [runtime] Add KvStateLocation lookup service

          • Adds an Akka-based KvStateLocation lookup service to be used by the client
            to look up location information.

          commit 7256e1dae34d6dbb8d96560bd0eac7a51ced7515
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2016-05-30T12:08:24Z

          FLINK-3779 [runtime] Add QueryableStateClient

          • Adds a client, which works with the network client and location lookup service
            to query KvState instances.
          • Furthermore, location information is cached.

          commit d7e602a55a4cfa6825b2126d51ed711d3c6ea866
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2016-05-30T12:08:34Z

          FLINK-3779 [streaming-java, streaming-scala] Add QueryableStateStream to KeyedStream

          [runtime, test-utils, tests]

          • Exposes queryable state on the API via KeyedStream#asQueryableState(String, StateDescriptor).
            This creates and operator, which consumes the keyed stream and exposes the stream
            as queryable state.

          Show
          githubbot ASF GitHub Bot added a comment - GitHub user uce opened a pull request: https://github.com/apache/flink/pull/2051 FLINK-3779 Add support for queryable state First of all, thanks to @tillrohrmann, @aljoscha, and @StephanEwen for discussions during and before implementing this first version. The initial design document can be found here: https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g * In a nutshell, this feature allows users to query Flink's managed partitioned state from outside of Flink. This eliminates the need for distributed operations/transactions with external systems such as key-value stores which are often the bottleneck in practice. * APIs QueryableStateStream The following methods have been added as `@PublicEvolving` to `KeyedStream`: ```java // ValueState QueryableStateStream asQueryableState( String queryableStateName, ValueStateDescriptor stateDescriptor) // Shortcut for explicit ValueStateDescriptor variant QueryableStateStream asQueryableState(String queryableStateName) // ListState QueryableStateStream asQueryableState( String queryableStateName, ListStateDescriptor stateDescriptor) // FoldingState QueryableStateStream asQueryableState( String queryableStateName, FoldingStateDescriptor stateDescriptor) // ReducingState QueryableStateStream asQueryableState( String queryableStateName, ReducingStateDescriptor stateDescriptor) ``` A call to these methods returns a `QueryableStateStream`, which cannot be further transformed and currently only holds the value and key serializer for the queryable state stream. It's comparable to a sink, after which you cannot do further transformations. The `QueryableStateStream` gets translated to an operator, which uses all incoming records to update the queryable state instance. If you have a program like `stream.keyBy(0).asQueryableState("query-name")`, all records of the keyed stream will be used to update the state instance, either via `update` for `ValueState` or `add` for `AppendingState`. This acts like the Scala API's `flatMapWithState`. For an example, take a look at `QueryableStateITCase` in `flink-tests`. I understand that these are quite a few methods to add to the public APIs, but I am not aware of another way to do it if we want to ensure type safety when providing the state descriptor. @aljoscha, you have quite some experience with designing APIs. Is there maybe a better way? And what do you (and others) think about the name `QueryableStateStream`? We could also go for the shorter `QueryableState` or something else even. I'm open to suggestions. QueryableStateClient This is the client used for queries against the KvState instances. The query method is this: ```java Future<byte[]> getKvState( JobID jobID, String queryableStateName, int keyHashCode, byte[] serializedKeyAndNamespace) ``` A call to the method returns a Future eventually holding the serialized state value for the queryable state instance identified by `queryableStateName` of the job with ID `jobID`. The `keyHashCode` is the hash code as returned by `Object.hashCode()` and the `serializedKeyAndNamespace` is the serialized key and namespace. The client is asynchronous and can be shared by multiple Threads. An example can be seen in `QueryableStateITCase` (in `flink-tests`). The current implementation is low-level in the sense that it only works with serialized data both for providing the key/namespace and the returned results. It's the responsibility of the user (or some follow-up utilities) to set up the serializers for this. The nice thing about this is that the query services don't have to get into the business of worrying about any class loading issues etc. There are some serialization utils for key/namespace and value serialization included in `KvStateRequestSerializer`. Implementation The following sections highlight the main changes/additions. Added `setQueryable(String)` to `StateDescriptor` KvState instances are published for queries when they have a queryable state name set (see below). For this purpose, I've introduced the `setQueryable(String)` method to the `StateDescriptor` interface. The provided name is different from the state descriptor name we already had before. For queries, only the name provided in `setQueryable(String)` is relevant. The name needs to be unique per job. If this is not the case, the job fails at runtime with an unrecoverable exception. Unfortunately, this can not be checked before submitting the job. Added `byte[] getSerializedValue(byte[] serializedKeyNamespace)` to `KvState` This method is implemented by all KvState instances for queries. Since all state instances have references to their serializers, they have to worry about serialization and the caller does not. For Java heap-backed state, we deserialize the key and namespace, access the state for the key/namespace, and serialize the result. For RocksDB backed state, we can directly use the `serializedKeyAndNamespace` to access the serialized result. Furthermore, with the RocksDB state backend we don't have to worry about concurrent accesses to the state instance whereas we need `ConcurrentHashMap`s for the internal key/namespace maps of `AbstractHeapState` if the state instance is queryable. Added `KvStateRegistry` to TaskManager This is a very simple registry on the TaskManager. The `AbstractStateBackend` registers `KvState` instances at runtime on: first call to `getPartitionedState()`, which creates the `State` instance, or `injectKeyValueStateSnapshots()`. At the moment, we essentially have two variants for the state backends: either a RocksDB backed or a Java heap-backed backend (`FileSystemStateBackend`, `MemoryStateBackend`). A note on restoring: RocksDB state will only be published for queries on `getPartitionedState()` whereas Java heap-backed state is already published on `injectKeyValueStateSnapshots()`. This has to do with the way that the RocksDB state backend organizes the state internally. I didn't want to change a lot there and I think it's a fair compromise for the first version. Added `KvStateLocationRegistry` to JobManager The `KvStateRegistry` of each TaskManager reports the registered state instances to the JobManager, where they are aggregated by the `KvStateLocationRegistry`. The purpose of this is to allow clients to query the JobManager for location information about the state they want to query. There is one `KvStateLocation` for each registered queryable state, which maps each key group index (currently the sub task index) to the server address holding the state instance. With this, the client can figure out which TaskManager to query for each key. Only when the location is unknown or out-of-sync, there needs to be communication between the client and JobManager. The lookup of `KvStateLocation` instances happens via Akka. Added `KvStateClient` and `KvStateServer` for network transfers The `KvStateClient` and `KvStateServer` are responsible for the actual data exchange via TCP. Each TaskManager runs a single `KvStateServer`, which queries the local `KvStateRegistry` on incoming requests. Connections are established and released by the client. Only on failures, does the server close a connection. Each client connection can be shared by multiple Threads. Both client and server keep track of statistics for their respects (how many requests and how long did they take). Limitations User docs are sparse. I wanted to wait for some initial feedback with this PR before writing anything. The queryable state life-cycle is bound to the life-cycle of the job, e.g. tasks register queryable state on startup and unregister it on dispose. In future versions, it is desirable to decouple this in order to allow queries after a task finishes and to speed up recovery via state replication. Notifications about available `KvState` happen via a simple `tell`. This should be improved to be more robust with `ask`s and acknowledgements. This was held simple on purpose in anticipation of possible state replication improvements (see first point), which probably need a differnt model of reporting available state. The server and client keep track of statistics for queries. These are currently disabled by default as they would not be exposed anywhere. As soon as there is better support to publish these numbers via the Metrics system, we should enable the stats. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 3779-queryable_state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2051.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2051 commit c1e8466f09787d229b78019e4d33d1c64932c74e Author: Ufuk Celebi <uce@apache.org> Date: 2016-05-30T11:42:39Z FLINK-3779 [runtime] Add getSerializedValue(byte[]) to KvState [statebackend-rocksdb, core, streaming-java] Adds the getSerializedValue(byte[]) to KvState, which is used to query single KvState instances. The serialization business is left to the KvState in order to not burden the accessor – e.g. the querying network thread – with setting up/accessing the serializers. Adds quaryable flag to the StateDescriptor. State, which sets a queryable state name will be published for queries to the KvStateRegistry. Prohibts null namespace and enforces VoidNamespace instead. This makes stuff more explicit. Furthermore, the concurrent map used for queryable memroy state does not allow working with null keys. commit cd651a651c247e3638d81c3db32b9619a35aaf2a Author: Ufuk Celebi <uce@apache.org> Date: 2016-05-30T12:03:35Z FLINK-3779 [runtime] Add KvStateRegistry for queryable KvState [streaming-java] Adds a KvStateRegistry per TaskManager at which created KvState instances are registered/unregistered. Registered KvState instances are reported to the JobManager, whcih can be queried for KvStateLocation. commit 3405e4a5fc3429f211b5ed0e4d6da394f7d5bb4e Author: Ufuk Celebi <uce@apache.org> Date: 2016-05-30T12:00:49Z FLINK-3779 [runtime] Add KvState network client and server Adds a Netty-based server and client to query KvState instances, which have been published to the KvStateRegistry. commit bb4baa33aaca8475d46da4c6c0ec3cbecbefa81a Author: Ufuk Celebi <uce@apache.org> Date: 2016-05-30T12:08:03Z FLINK-3779 [runtime] Add KvStateLocation lookup service Adds an Akka-based KvStateLocation lookup service to be used by the client to look up location information. commit 7256e1dae34d6dbb8d96560bd0eac7a51ced7515 Author: Ufuk Celebi <uce@apache.org> Date: 2016-05-30T12:08:24Z FLINK-3779 [runtime] Add QueryableStateClient Adds a client, which works with the network client and location lookup service to query KvState instances. Furthermore, location information is cached. commit d7e602a55a4cfa6825b2126d51ed711d3c6ea866 Author: Ufuk Celebi <uce@apache.org> Date: 2016-05-30T12:08:34Z FLINK-3779 [streaming-java, streaming-scala] Add QueryableStateStream to KeyedStream [runtime, test-utils, tests] Exposes queryable state on the API via KeyedStream#asQueryableState(String, StateDescriptor). This creates and operator, which consumes the keyed stream and exposes the stream as queryable state.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user gyfora commented on the pull request:

          https://github.com/apache/flink/pull/2051#issuecomment-222548679

          Awesome feature Ufuk, I am very excited to try this out and give some feedback

          So if I understand correctly in order to query the state I can use the queryable state stream. Would it be also possible to query the state from an arbitrary operator with the same logic (from a map for instance). Also what happens if I call setQueryable... on a ValueStateDescriptor when creating a state in my udf?

          Show
          githubbot ASF GitHub Bot added a comment - Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/2051#issuecomment-222548679 Awesome feature Ufuk, I am very excited to try this out and give some feedback So if I understand correctly in order to query the state I can use the queryable state stream. Would it be also possible to query the state from an arbitrary operator with the same logic (from a map for instance). Also what happens if I call setQueryable... on a ValueStateDescriptor when creating a state in my udf?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the pull request:

          https://github.com/apache/flink/pull/2051#issuecomment-222554801

          Hey Gyula!

          The `QueryableStateStream` is only for convenience. It's just a creating a `AbstractQueryableStateOperator`, which takes care of setting up an operator, which consumes data and calls the `state.update()` or `state.add()` method respectively.

          You can use the `QueryableStateClient` anywhere you like, including your map operator. Just be aware that further Threads will be started for the network communication (configurable). You can share it between operators though.

          You can make any `KvState` queryable by calling the `setQueryable(String)` method of its state descriptor.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the pull request: https://github.com/apache/flink/pull/2051#issuecomment-222554801 Hey Gyula! The `QueryableStateStream` is only for convenience. It's just a creating a `AbstractQueryableStateOperator`, which takes care of setting up an operator, which consumes data and calls the `state.update()` or `state.add()` method respectively. You can use the `QueryableStateClient` anywhere you like, including your map operator. Just be aware that further Threads will be started for the network communication (configurable). You can share it between operators though. You can make any `KvState` queryable by calling the `setQueryable(String)` method of its state descriptor.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user gyfora commented on the pull request:

          https://github.com/apache/flink/pull/2051#issuecomment-222600015

          Thanks for the explanation. I will start testing/reviewing this later today.

          Show
          githubbot ASF GitHub Bot added a comment - Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/2051#issuecomment-222600015 Thanks for the explanation. I will start testing/reviewing this later today.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user gyfora commented on the pull request:

          https://github.com/apache/flink/pull/2051#issuecomment-222606282

          I will gradually add some questions/comments as I go

          1. Do we really need a QueryableStateStream exposed in the API? As you said this is just a pretty basic sink that anyone can probably inline who knows Flink enough to use the States. And I am guessing looking inside other operators is probably the most interesting use-case for this new feature.

          (2. If we decide to keep the QueryableStateStream could it be just implemented as a simple RichSink? stream.addSink(...) instead of adding another operator to the runtime layer)

          3. I think it would be great to make the KvStateClient somehow aware of the types, because passing keynamespace byte arrays will be confusing for users as they will have no idea what the namespace is. The namespaces are pretty internal to the system. Maybe we could allow users to register serializers for state ids in the KvStateClient so they would not need to manually pass byte arrays and they could work with the actual keys. Also if they don't want to specify the namespace, we could then serialize it with the VoidSerializer as it usually happens in the runtime in most cases.

          Show
          githubbot ASF GitHub Bot added a comment - Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/2051#issuecomment-222606282 I will gradually add some questions/comments as I go 1. Do we really need a QueryableStateStream exposed in the API? As you said this is just a pretty basic sink that anyone can probably inline who knows Flink enough to use the States. And I am guessing looking inside other operators is probably the most interesting use-case for this new feature. (2. If we decide to keep the QueryableStateStream could it be just implemented as a simple RichSink? stream.addSink(...) instead of adding another operator to the runtime layer) 3. I think it would be great to make the KvStateClient somehow aware of the types, because passing keynamespace byte arrays will be confusing for users as they will have no idea what the namespace is. The namespaces are pretty internal to the system. Maybe we could allow users to register serializers for state ids in the KvStateClient so they would not need to manually pass byte arrays and they could work with the actual keys. Also if they don't want to specify the namespace, we could then serialize it with the VoidSerializer as it usually happens in the runtime in most cases.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the pull request:

          https://github.com/apache/flink/pull/2051#issuecomment-222625496

          1. I think that you are very much looking at it with your specific use case in mind, which is (I think) querying other operators from within your operators. To me, that's not the main use case for queryable state though... For that, it's true that the `QueryableStateStream` does not provide much help. If you are already using partitioned state manually, then you will probably go with the `setQueryable` method, that's true. But for some state like Flink's internal windows, you don't have access to the `StateDescriptor` (which is currently not exposed for queries on the API though). Furthermore, I think it's good to provide a low barrier way of doing things. But if others feel the same, I'm certainly OK with removing it.

          2. I think that would be possible, yes. I agree that the `QueryableStateStream` is conceptually similar to a sink.

          3. I agree, but that was on purpose for the first version until we figure out how to expose it properly. You have utilities in `KvStateRequestSerializer` to serialize key and namespace and you can use `QueryableStateStream` to access the key and value serializer. Namespace is usually `VoidNamespace` if you are not querying windows.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the pull request: https://github.com/apache/flink/pull/2051#issuecomment-222625496 1. I think that you are very much looking at it with your specific use case in mind, which is (I think) querying other operators from within your operators. To me, that's not the main use case for queryable state though... For that, it's true that the `QueryableStateStream` does not provide much help. If you are already using partitioned state manually, then you will probably go with the `setQueryable` method, that's true. But for some state like Flink's internal windows, you don't have access to the `StateDescriptor` (which is currently not exposed for queries on the API though). Furthermore, I think it's good to provide a low barrier way of doing things. But if others feel the same, I'm certainly OK with removing it. 2. I think that would be possible, yes. I agree that the `QueryableStateStream` is conceptually similar to a sink. 3. I agree, but that was on purpose for the first version until we figure out how to expose it properly. You have utilities in `KvStateRequestSerializer` to serialize key and namespace and you can use `QueryableStateStream` to access the key and value serializer. Namespace is usually `VoidNamespace` if you are not querying windows.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user gyfora commented on the pull request:

          https://github.com/apache/flink/pull/2051#issuecomment-222627001

          You are probably right that I am little biased regarding (1), sorry

          Show
          githubbot ASF GitHub Bot added a comment - Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/2051#issuecomment-222627001 You are probably right that I am little biased regarding (1), sorry
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the pull request:

          https://github.com/apache/flink/pull/2051#issuecomment-222648249

          Hi,
          just some high-level remarks about the API of `KeyedStream` and `QueryableStateStream`. You could parameterize them both by the State type. This way you would also get a very nice way of generically accessing the different types of state on the query side. Let me quickly show what I mean. `KeyedStream` would have this method:

          ```
          <S extends State> QueryableStateStream<KEY, S> asQueryableState(
          String queryableStateName,
          StateDescriptor<S, ?> stateDescriptor);
          ```

          the signature of `QueryableStateStream` would be like this:

          ```
          public class QueryableStateStream<K, S extends State> {
          private StateDescriptor<S, ?> stateDescriptor;

          /**

          • Read the state represented by this stream using a state client
            */
            public S read(K key, QueryableStateClient client) { return stateDescriptor.bind(new QueryStateBinder(client.read(key, stateName))); }

            }
            ```

          The nice thing about `StateDescriptor.bind()` is that you can use this to create a state reader based on the state type. You pass in a custom `StateBackend` (this should really be called `StateBinder`, btw). That constructs a state reader that deserializes the bytes read from the state client.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/2051#issuecomment-222648249 Hi, just some high-level remarks about the API of `KeyedStream` and `QueryableStateStream`. You could parameterize them both by the State type. This way you would also get a very nice way of generically accessing the different types of state on the query side. Let me quickly show what I mean. `KeyedStream` would have this method: ``` <S extends State> QueryableStateStream<KEY, S> asQueryableState( String queryableStateName, StateDescriptor<S, ?> stateDescriptor); ``` the signature of `QueryableStateStream` would be like this: ``` public class QueryableStateStream<K, S extends State> { private StateDescriptor<S, ?> stateDescriptor; /** Read the state represented by this stream using a state client */ public S read(K key, QueryableStateClient client) { return stateDescriptor.bind(new QueryStateBinder(client.read(key, stateName))); } } ``` The nice thing about `StateDescriptor.bind()` is that you can use this to create a state reader based on the state type. You pass in a custom `StateBackend` (this should really be called `StateBinder`, btw). That constructs a state reader that deserializes the bytes read from the state client.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user soniclavier commented on the issue:

          https://github.com/apache/flink/pull/2051

          Hi,

          Continuing the discussion from the mailing list, I was able to go past the NettyConfig problem once I ran Flink in cluster mode ( I would still like to know if there is a way to run in local mode so that I can avoid running SBT assembly every time ).

          But now I am stuck at error message "KvState does not hold any state for key/namespace." which I believe is because of my KeySerializer. Since I am running the QueryClient as a separate application, I don't have access to my queryableState to call `queryableState.getKeySerializer`

          My key is a tuple of (Long,String) and this is the naive serializer that I wrote (which is probably wrong and I have never written a serializer before)

          ```
          class KeySerializer extends TypeSerializerSingleton[(Long,String)]{

          private val EMPTY: (Long,String) = (0,"")

          override def createInstance(): (Long, String) = EMPTY

          override def getLength: Int = return 2;

          override def canEqual(o: scala.Any): Boolean = return o.isInstanceOf[(Long,String)]

          override def copy(t: (Long, String)): (Long, String) = t

          override def copy(t: (Long, String), t1: (Long, String)): (Long, String) = t

          override def copy(dataInputView: DataInputView, dataOutputView: DataOutputView): Unit =

          { dataOutputView.writeLong(dataInputView.readLong()) StringValue.copyString(dataInputView,dataOutputView) }

          override def serialize(t: (Long, String), dataOutputView: DataOutputView): Unit =

          { dataOutputView.writeLong(t._1) StringValue.writeString(t._2,dataOutputView) }

          override def isImmutableType: Boolean = true

          override def deserialize(dataInputView: DataInputView): (Long, String) =

          { val l = dataInputView.readLong() val s = StringValue.readString(dataInputView) (l,s) }

          override def deserialize(t: (Long, String), dataInputView: DataInputView): (Long, String) = deserialize(dataInputView)
          }
          ```

          Can you tell me what I am doing wrong here? Thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 Hi, Continuing the discussion from the mailing list, I was able to go past the NettyConfig problem once I ran Flink in cluster mode ( I would still like to know if there is a way to run in local mode so that I can avoid running SBT assembly every time ). But now I am stuck at error message "KvState does not hold any state for key/namespace." which I believe is because of my KeySerializer. Since I am running the QueryClient as a separate application, I don't have access to my queryableState to call `queryableState.getKeySerializer` My key is a tuple of (Long,String) and this is the naive serializer that I wrote (which is probably wrong and I have never written a serializer before) ``` class KeySerializer extends TypeSerializerSingleton [(Long,String)] { private val EMPTY: (Long,String) = (0,"") override def createInstance(): (Long, String) = EMPTY override def getLength: Int = return 2; override def canEqual(o: scala.Any): Boolean = return o.isInstanceOf [(Long,String)] override def copy(t: (Long, String)): (Long, String) = t override def copy(t: (Long, String), t1: (Long, String)): (Long, String) = t override def copy(dataInputView: DataInputView, dataOutputView: DataOutputView): Unit = { dataOutputView.writeLong(dataInputView.readLong()) StringValue.copyString(dataInputView,dataOutputView) } override def serialize(t: (Long, String), dataOutputView: DataOutputView): Unit = { dataOutputView.writeLong(t._1) StringValue.writeString(t._2,dataOutputView) } override def isImmutableType: Boolean = true override def deserialize(dataInputView: DataInputView): (Long, String) = { val l = dataInputView.readLong() val s = StringValue.readString(dataInputView) (l,s) } override def deserialize(t: (Long, String), dataInputView: DataInputView): (Long, String) = deserialize(dataInputView) } ``` Can you tell me what I am doing wrong here? Thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user soniclavier commented on the issue:

          https://github.com/apache/flink/pull/2051

          Never mind, I was hitting with wrong key, it works now! Cheers.

          Show
          githubbot ASF GitHub Bot added a comment - Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 Never mind, I was hitting with wrong key, it works now! Cheers.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2051

          Regarding local vs. cluster mode: that's on purpose, but we can certainly change that behaviour. For now, you would have to run in cluster mode.

          Regarding the serializer: assuming that it is a Flink `Tuple2<Long, String>` you can use the following to get the serializer:

          ```java
          TypeSerializer<?>[] fieldSerializers = new TypeSerializer[]

          { StringSerializer.INSTANCE, LongSerializer.INSTANCE }

          ;

          TypeSerializer<Tuple2<String, Long>> serializer = new TupleSerializer<>(
          (Class<Tuple2<String, Long>>) (Class<?>) Tuple2.class, fieldSerializers);
          ```

          *Just to make sure that we are on the same page: the state of this PR is not the final queryable state API, but only the initial low-level version.* Really looking forward to further feedback. Thank you for trying it out at this stage.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2051 Regarding local vs. cluster mode: that's on purpose, but we can certainly change that behaviour. For now, you would have to run in cluster mode. Regarding the serializer: assuming that it is a Flink `Tuple2<Long, String>` you can use the following to get the serializer: ```java TypeSerializer<?>[] fieldSerializers = new TypeSerializer[] { StringSerializer.INSTANCE, LongSerializer.INSTANCE } ; TypeSerializer<Tuple2<String, Long>> serializer = new TupleSerializer<>( (Class<Tuple2<String, Long>>) (Class<?>) Tuple2.class, fieldSerializers); ``` * Just to make sure that we are on the same page: the state of this PR is not the final queryable state API, but only the initial low-level version. * Really looking forward to further feedback. Thank you for trying it out at this stage.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2051

          A simpler way to get the serializer may be
          ```java
          TypeInformation.of(new TypeHint<Tuple2<String, Long>>(){}).createSerializer(null);
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2051 A simpler way to get the serializer may be ```java TypeInformation.of(new TypeHint<Tuple2<String, Long>>(){}).createSerializer(null); ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user soniclavier commented on the issue:

          https://github.com/apache/flink/pull/2051

          Thanks Ufuk & Stephen for the reply,

          I tried the serializers suggested by you
          ```
          val typeHint = new TypeHint[Tuple2[Long,String]](){}
          val serializer = TypeInformation.of(typeHint).createSerializer(null)

          //also tried this
          val fieldSerializers = Array[TypeSerializer[_]](StringSerializer.INSTANCE, LongSerializer.INSTANCE)
          val serializer2 = new TupleSerializer(classOf[Tuple2[Long,String]].asInstanceOf[Class[_]].asInstanceOf[Class[Tuple2[String, Long]]], fieldSerializers)
          ```

          But both gives me compilation error at
          ```
          val serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
          key,
          serializer2,
          VoidNamespace.INSTANCE,
          VoidNamespaceSerializer.INSTANCE)
          ```
          the compilation error is:
          ```
          Error:(43, 7) type mismatch;
          found : org.apache.flink.api.common.typeutils.TypeSerializer[org.apache.flink.api.java.tuple.Tuple2[Long,String]]
          required: org.apache.flink.api.common.typeutils.TypeSerializer[java.io.Serializable]
          Note: org.apache.flink.api.java.tuple.Tuple2[Long,String] <: java.io.Serializable, but Java-defined class TypeSerializer is invariant in type T.
          You may wish to investigate a wildcard type such as `_ <: java.io.Serializable`. (SLS 3.2.10)
          serializer,
          ^
          ```

          I had seen this before when I tried to set the serializer from `queryableState.getKeySerializer`

          Note : It works fine when I use the longer version of serializer that I created.

          Show
          githubbot ASF GitHub Bot added a comment - Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 Thanks Ufuk & Stephen for the reply, I tried the serializers suggested by you ``` val typeHint = new TypeHint[Tuple2 [Long,String] ](){} val serializer = TypeInformation.of(typeHint).createSerializer(null) //also tried this val fieldSerializers = Array[TypeSerializer [_] ](StringSerializer.INSTANCE, LongSerializer.INSTANCE) val serializer2 = new TupleSerializer(classOf[Tuple2 [Long,String] ].asInstanceOf[Class [_] ].asInstanceOf[Class[Tuple2 [String, Long] ]], fieldSerializers) ``` But both gives me compilation error at ``` val serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace( key, serializer2, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE) ``` the compilation error is: ``` Error:(43, 7) type mismatch; found : org.apache.flink.api.common.typeutils.TypeSerializer[org.apache.flink.api.java.tuple.Tuple2 [Long,String] ] required: org.apache.flink.api.common.typeutils.TypeSerializer [java.io.Serializable] Note: org.apache.flink.api.java.tuple.Tuple2 [Long,String] <: java.io.Serializable, but Java-defined class TypeSerializer is invariant in type T. You may wish to investigate a wildcard type such as `_ <: java.io.Serializable`. (SLS 3.2.10) serializer, ^ ``` I had seen this before when I tried to set the serializer from `queryableState.getKeySerializer` Note : It works fine when I use the longer version of serializer that I created.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user soniclavier commented on the issue:

          https://github.com/apache/flink/pull/2051

          Sorry, the compilation error was because the Tuple2 was scala.Tuple2 not flink Tuple2. Changing to `org.apache.flink.api.java.tuple.Tuple2` fixed the issue.

          Show
          githubbot ASF GitHub Bot added a comment - Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 Sorry, the compilation error was because the Tuple2 was scala.Tuple2 not flink Tuple2. Changing to `org.apache.flink.api.java.tuple.Tuple2` fixed the issue.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user soniclavier commented on the issue:

          https://github.com/apache/flink/pull/2051

          One more question, is it possible to configure the JobManager Actor path that the client connects to, it looks like it default to `akka://flink/user/jobmanager`.
          In that way I can create a much more generic client.

          Note: I know this is initial version, just curious if this is already implemented.

          Show
          githubbot ASF GitHub Bot added a comment - Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 One more question, is it possible to configure the JobManager Actor path that the client connects to, it looks like it default to `akka://flink/user/jobmanager`. In that way I can create a much more generic client. Note: I know this is initial version, just curious if this is already implemented.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user soumyasd commented on the issue:

          https://github.com/apache/flink/pull/2051

          Any idea which Flink version this feature is going live with ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user soumyasd commented on the issue: https://github.com/apache/flink/pull/2051 Any idea which Flink version this feature is going live with ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2051

          @soniclavier I think this is not configurable in Flink at the moment. The client uses the `LeaderRetrievalService` to retrieve the job manager path.

          @soumyasd I hope to merge this after the 1.1 fork-off this week. This would mean that it would be part of the 1.2 release (~3 months if everything goes according to release schedule).

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2051 @soniclavier I think this is not configurable in Flink at the moment. The client uses the `LeaderRetrievalService` to retrieve the job manager path. @soumyasd I hope to merge this after the 1.1 fork-off this week. This would mean that it would be part of the 1.2 release (~3 months if everything goes according to release schedule).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2051

          Now that we have forked off the 1.1 release branch, I would like to merge this if there are no objections. There are not many changes to our current code base and the follow ups can be addressed until the 1.2 release.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2051 Now that we have forked off the 1.1 release branch, I would like to merge this if there are no objections. There are not many changes to our current code base and the follow ups can be addressed until the 1.2 release.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2051

          Good from my side.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2051 Good from my side.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rehevkor5 commented on the issue:

          https://github.com/apache/flink/pull/2051

          Hi, it's great to see that someone is working on this stuff!

          I just wanted to put in my two cents, to provide a different perspective that might change how you are thinking about this.

          On my project, we are interested in incorporating pre-computed historical time-series data into the values within a time window. Those values would need to be loaded from a distributed database such as Cassandra or DynamoDB. Also, we would like for newly computed time-series data points (produced by a Flink window pane) to be persisted externally, side-by-side with the historical data (in Cassandra/DynamoDB).

          In contrast with your approach, which enables querying of state from within Flink, we are more interested in querying that state from the external database. This allows the Flink job to produce time series data which can be queried ad-hoc in the database, while also allowing the Flink job to produce pre-calculated aggregates from that time series.

          I believe others have mentioned in this thread the need, therefore, to allow the State Store to choose the serialization approach. While serializing to byte[] works well for Memory and RocksDB State Stores, inserting into a NoSQL database requires creation of an INSERT command with data that includes primary/partition key, secondary/range key, and arbitrarily structured data (one column of byte[], or perhaps more complex based on the particular type of value). In particular, we need the timestamp of the time series point to be a top-level value in the INSERT, so that time range queries can be efficient. The interface is also important when it comes to Flink loading pre-existing data, because Flink or an integration layer will need to know how to query for the particular keys it is looking for.

          I hope that makes sense & gives some perspective on what some people are thinking about with regard to "queryable state".

          Show
          githubbot ASF GitHub Bot added a comment - Github user rehevkor5 commented on the issue: https://github.com/apache/flink/pull/2051 Hi, it's great to see that someone is working on this stuff! I just wanted to put in my two cents, to provide a different perspective that might change how you are thinking about this. On my project, we are interested in incorporating pre-computed historical time-series data into the values within a time window. Those values would need to be loaded from a distributed database such as Cassandra or DynamoDB. Also, we would like for newly computed time-series data points (produced by a Flink window pane) to be persisted externally, side-by-side with the historical data (in Cassandra/DynamoDB). In contrast with your approach, which enables querying of state from within Flink, we are more interested in querying that state from the external database. This allows the Flink job to produce time series data which can be queried ad-hoc in the database, while also allowing the Flink job to produce pre-calculated aggregates from that time series. I believe others have mentioned in this thread the need, therefore, to allow the State Store to choose the serialization approach. While serializing to byte[] works well for Memory and RocksDB State Stores, inserting into a NoSQL database requires creation of an INSERT command with data that includes primary/partition key, secondary/range key, and arbitrarily structured data (one column of byte[], or perhaps more complex based on the particular type of value). In particular, we need the timestamp of the time series point to be a top-level value in the INSERT, so that time range queries can be efficient. The interface is also important when it comes to Flink loading pre-existing data, because Flink or an integration layer will need to know how to query for the particular keys it is looking for. I hope that makes sense & gives some perspective on what some people are thinking about with regard to "queryable state".
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2051

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2051
          Hide
          uce Ufuk Celebi added a comment -

          Implemented in a909adb~1..490e7eb (master).

          Show
          uce Ufuk Celebi added a comment - Implemented in a909adb~1..490e7eb (master).

            People

            • Assignee:
              uce Ufuk Celebi
              Reporter:
              uce Ufuk Celebi
            • Votes:
              1 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development