Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-14354

Provide interfaces instead of abstract classes in org.apache.flink.state.api.functions

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Open
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: API / State Processor
    • Labels:
      None

      Description

      I've started using the new state processing API in Flink 1.9. Super useful and works great for the most part.

      However, I think there is opportunity to simplify implementations that use the API. My request to enable these simplifications is to provides interfaces instead of (or in addition to) abstract classes in org.apache.flink.state.api.functions. Then have the state processing API require those interfaces.

      My use case involves maintaining and processing keyed state. This is accomplished with a KeyedProcessFunction:

      class BooleanProcess extends KeyedProcessFunction[String, String, String] {

         var bool: ValueState[Boolean] = _

         override def open(parameters: Configuration) {
           bool = getRuntimeContext.getState(new ValueStateDescriptor("boolean-state", classOf[Boolean]))
         }

         override def processElement(value: String, ctx: KeyedProcessFunction[String, String, String]#Context, out: Collector[String]): Unit = {

           if (bool.value)

      {        out.collect(value)     }

      else {
             if (Math.random < 0.005) {
               bool.update(true)
               out.collect(value)
             }
           }
         }
      }

       

      I then use a KeyedStateReaderFunction like this to inspect savepoints/checkpoints:

       

      class BooleanProcessStateReader extends KeyedStateReaderFunction[String, String] {

         var bool: ValueState[Boolean] = _

         override def open(parameters: Configuration) {
           bool = getRuntimeContext.getState(new ValueStateDescriptor("boolean-state", classOf[Boolean]))
         }

         override def readKey(key: String, ctx: KeyedStateReaderFunction.Context, out: Collector[String]): Unit =

      {      out.collect(key)        }

      }

      Ideally, I would like my KeyedStateReaderFunction to look like this:

      class BooleanProcessStateReader extends BooleanProcess implements KeyedStateReaderFunction[String, String] {

         override def readKey(key: String, ctx: KeyedStateReaderFunction.Context, out: Collector[String]): Unit =

      {      out.collect(key)    }

      }

      However, this can't be done with the current API due Java's single inheritance and KeyedStateReaderFunction being an abstract class.

      The code savings are rather trivial in this example. However, it makes the state reader much easier to maintain. It would automatically inherit state and life cycle methods from the class whose state it is inspecting.

       

       

       

       

       

       

       

       

       

       

       

       

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              mlmitch Mitch Wasson
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated: