Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6023

Fix Scala snippet into Process Function (Low-level Operations) Doc

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Trivial
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Documentation
    • Labels:
      None

      Description

      The current `/docs/dev/stream/process_function.md` has some errors in the Scala snippet

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3510

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3510
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on the issue:

          https://github.com/apache/flink/pull/3510

          Thanks @maocorte for the quick fix, looks good to merge now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3510 Thanks @maocorte for the quick fix, looks good to merge now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maocorte commented on the issue:

          https://github.com/apache/flink/pull/3510

          thank you @KurtYoung for your review

          Show
          githubbot ASF GitHub Bot added a comment - Github user maocorte commented on the issue: https://github.com/apache/flink/pull/3510 thank you @KurtYoung for your review
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3510#discussion_r105422108

          — Diff: docs/dev/stream/process_function.md —
          @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String,

          <div data-lang="scala" markdown="1">

          {% highlight scala %}

          -import org.apache.flink.api.common.state.ValueState;
          -import org.apache.flink.api.common.state.ValueStateDescriptor;
          -import org.apache.flink.streaming.api.functions.ProcessFunction;
          -import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
          -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
          -import org.apache.flink.util.Collector;
          +import org.apache.flink.api.common.state.ValueState
          +import org.apache.flink.api.common.state.ValueStateDescriptor
          +import org.apache.flink.streaming.api.functions.RichProcessFunction
          +import org.apache.flink.streaming.api.functions.ProcessFunction.Context
          +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
          +import org.apache.flink.util.Collector

          // the source data stream
          -DataStream<Tuple2<String, String>> stream = ...;
          +val stream: DataStream[Tuple2[String, String]] = ...

          // apply the process function onto a keyed stream
          -DataStream<Tuple2<String, Long>> result = stream

          • .keyBy(0)
          • .process(new CountWithTimeoutFunction());
            +val result: DataStream[Tuple2[String, Long]] = stream
            + .keyBy(0)
            + .process(new CountWithTimeoutFunction())

          /**

          • * The data type stored in the state
          • */
            + * The data type stored in the state
            + */
            case class CountWithTimestamp(key: String, count: Long, lastModified: Long)

          /**

          • * The implementation of the ProcessFunction that maintains the count and timeouts
          • */
            -class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] {
            + * The implementation of the ProcessFunction that maintains the count and timeouts
            + */
            +class TimeoutStateFunction extends RichProcessFunction[(String, Long), (String, Long)] {

          /** The state that is maintained by this process function */

          • lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
          • .getState(new ValueStateDescriptor<>("myState", clasOf[CountWithTimestamp]))
            + lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
            + .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))

          override def processElement(value: (String, Long), ctx: Context, out: Collector[(String, Long)]): Unit = {
          // initialize or retrieve/update the state
          + val (key, _) = value
          — End diff –

          Sorry i didn't make myself clear. What IDS complains is the variable name `key` is conflicts with the following lines:
          ```case CountWithTimestamp(key, count, _) =>
          CountWithTimestamp(key, count + 1, ctx.timestamp)
          ```
          It's not clear whether you want to use the `key` you just defined or the `key` in the match pattern.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3510#discussion_r105422108 — Diff: docs/dev/stream/process_function.md — @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, <div data-lang="scala" markdown="1"> {% highlight scala %} -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.ProcessFunction.Context; -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext; -import org.apache.flink.util.Collector; +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.streaming.api.functions.RichProcessFunction +import org.apache.flink.streaming.api.functions.ProcessFunction.Context +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext +import org.apache.flink.util.Collector // the source data stream -DataStream<Tuple2<String, String>> stream = ...; +val stream: DataStream[Tuple2 [String, String] ] = ... // apply the process function onto a keyed stream -DataStream<Tuple2<String, Long>> result = stream .keyBy(0) .process(new CountWithTimeoutFunction()); +val result: DataStream[Tuple2 [String, Long] ] = stream + .keyBy(0) + .process(new CountWithTimeoutFunction()) /** * The data type stored in the state */ + * The data type stored in the state + */ case class CountWithTimestamp(key: String, count: Long, lastModified: Long) /** * The implementation of the ProcessFunction that maintains the count and timeouts */ -class TimeoutStateFunction extends ProcessFunction [(String, Long), (String, Long)] { + * The implementation of the ProcessFunction that maintains the count and timeouts + */ +class TimeoutStateFunction extends RichProcessFunction [(String, Long), (String, Long)] { /** The state that is maintained by this process function */ lazy val state: ValueState [CountWithTimestamp] = getRuntimeContext() .getState(new ValueStateDescriptor<>("myState", clasOf [CountWithTimestamp] )) + lazy val state: ValueState [CountWithTimestamp] = getRuntimeContext + .getState(new ValueStateDescriptor [CountWithTimestamp] ("myState", classOf [CountWithTimestamp] )) override def processElement(value: (String, Long), ctx: Context, out: Collector [(String, Long)] ): Unit = { // initialize or retrieve/update the state + val (key, _) = value — End diff – Sorry i didn't make myself clear. What IDS complains is the variable name `key` is conflicts with the following lines: ```case CountWithTimestamp(key, count, _) => CountWithTimestamp(key, count + 1, ctx.timestamp) ``` It's not clear whether you want to use the `key` you just defined or the `key` in the match pattern.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maocorte commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3510#discussion_r105420747

          — Diff: docs/dev/stream/process_function.md —
          @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String,

          <div data-lang="scala" markdown="1">

          {% highlight scala %}

          -import org.apache.flink.api.common.state.ValueState;
          -import org.apache.flink.api.common.state.ValueStateDescriptor;
          -import org.apache.flink.streaming.api.functions.ProcessFunction;
          -import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
          -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
          -import org.apache.flink.util.Collector;
          +import org.apache.flink.api.common.state.ValueState
          +import org.apache.flink.api.common.state.ValueStateDescriptor
          +import org.apache.flink.streaming.api.functions.RichProcessFunction
          +import org.apache.flink.streaming.api.functions.ProcessFunction.Context
          +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
          +import org.apache.flink.util.Collector

          // the source data stream
          -DataStream<Tuple2<String, String>> stream = ...;
          +val stream: DataStream[Tuple2[String, String]] = ...

          // apply the process function onto a keyed stream
          -DataStream<Tuple2<String, Long>> result = stream

          • .keyBy(0)
          • .process(new CountWithTimeoutFunction());
            +val result: DataStream[Tuple2[String, Long]] = stream
            + .keyBy(0)
            + .process(new CountWithTimeoutFunction())

          /**

          • * The data type stored in the state
          • */
            + * The data type stored in the state
            + */
            case class CountWithTimestamp(key: String, count: Long, lastModified: Long)

          /**

          • * The implementation of the ProcessFunction that maintains the count and timeouts
          • */
            -class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] {
            + * The implementation of the ProcessFunction that maintains the count and timeouts
            + */
            +class TimeoutStateFunction extends RichProcessFunction[(String, Long), (String, Long)] {

          /** The state that is maintained by this process function */

          • lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
          • .getState(new ValueStateDescriptor<>("myState", clasOf[CountWithTimestamp]))
            + lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
            + .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))

          override def processElement(value: (String, Long), ctx: Context, out: Collector[(String, Long)]): Unit = {
          // initialize or retrieve/update the state
          + val (key, _) = value
          — End diff –

          thank you @KurtYoung for the report, is it ok for you if i'll change it with `val (key, count) = value`?
          personally i don't like the using `_1` because i think it' not so clear, but probably it's just a mine problem

          Show
          githubbot ASF GitHub Bot added a comment - Github user maocorte commented on a diff in the pull request: https://github.com/apache/flink/pull/3510#discussion_r105420747 — Diff: docs/dev/stream/process_function.md — @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, <div data-lang="scala" markdown="1"> {% highlight scala %} -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.ProcessFunction.Context; -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext; -import org.apache.flink.util.Collector; +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.streaming.api.functions.RichProcessFunction +import org.apache.flink.streaming.api.functions.ProcessFunction.Context +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext +import org.apache.flink.util.Collector // the source data stream -DataStream<Tuple2<String, String>> stream = ...; +val stream: DataStream[Tuple2 [String, String] ] = ... // apply the process function onto a keyed stream -DataStream<Tuple2<String, Long>> result = stream .keyBy(0) .process(new CountWithTimeoutFunction()); +val result: DataStream[Tuple2 [String, Long] ] = stream + .keyBy(0) + .process(new CountWithTimeoutFunction()) /** * The data type stored in the state */ + * The data type stored in the state + */ case class CountWithTimestamp(key: String, count: Long, lastModified: Long) /** * The implementation of the ProcessFunction that maintains the count and timeouts */ -class TimeoutStateFunction extends ProcessFunction [(String, Long), (String, Long)] { + * The implementation of the ProcessFunction that maintains the count and timeouts + */ +class TimeoutStateFunction extends RichProcessFunction [(String, Long), (String, Long)] { /** The state that is maintained by this process function */ lazy val state: ValueState [CountWithTimestamp] = getRuntimeContext() .getState(new ValueStateDescriptor<>("myState", clasOf [CountWithTimestamp] )) + lazy val state: ValueState [CountWithTimestamp] = getRuntimeContext + .getState(new ValueStateDescriptor [CountWithTimestamp] ("myState", classOf [CountWithTimestamp] )) override def processElement(value: (String, Long), ctx: Context, out: Collector [(String, Long)] ): Unit = { // initialize or retrieve/update the state + val (key, _) = value — End diff – thank you @KurtYoung for the report, is it ok for you if i'll change it with `val (key, count) = value`? personally i don't like the using `_1` because i think it' not so clear, but probably it's just a mine problem
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3510#discussion_r105420103

          — Diff: docs/dev/stream/process_function.md —
          @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String,

          <div data-lang="scala" markdown="1">

          {% highlight scala %}

          -import org.apache.flink.api.common.state.ValueState;
          -import org.apache.flink.api.common.state.ValueStateDescriptor;
          -import org.apache.flink.streaming.api.functions.ProcessFunction;
          -import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
          -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
          -import org.apache.flink.util.Collector;
          +import org.apache.flink.api.common.state.ValueState
          +import org.apache.flink.api.common.state.ValueStateDescriptor
          +import org.apache.flink.streaming.api.functions.RichProcessFunction
          +import org.apache.flink.streaming.api.functions.ProcessFunction.Context
          +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
          +import org.apache.flink.util.Collector

          // the source data stream
          -DataStream<Tuple2<String, String>> stream = ...;
          +val stream: DataStream[Tuple2[String, String]] = ...

          // apply the process function onto a keyed stream
          -DataStream<Tuple2<String, Long>> result = stream

          • .keyBy(0)
          • .process(new CountWithTimeoutFunction());
            +val result: DataStream[Tuple2[String, Long]] = stream
            + .keyBy(0)
            + .process(new CountWithTimeoutFunction())

          /**

          • * The data type stored in the state
          • */
            + * The data type stored in the state
            + */
            case class CountWithTimestamp(key: String, count: Long, lastModified: Long)

          /**

          • * The implementation of the ProcessFunction that maintains the count and timeouts
          • */
            -class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] {
            + * The implementation of the ProcessFunction that maintains the count and timeouts
            + */
            +class TimeoutStateFunction extends RichProcessFunction[(String, Long), (String, Long)] {

          /** The state that is maintained by this process function */

          • lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
          • .getState(new ValueStateDescriptor<>("myState", clasOf[CountWithTimestamp]))
            + lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
            + .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))

          override def processElement(value: (String, Long), ctx: Context, out: Collector[(String, Long)]): Unit = {
          — End diff –

          I think it should be value: (String, String)

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3510#discussion_r105420103 — Diff: docs/dev/stream/process_function.md — @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, <div data-lang="scala" markdown="1"> {% highlight scala %} -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.ProcessFunction.Context; -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext; -import org.apache.flink.util.Collector; +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.streaming.api.functions.RichProcessFunction +import org.apache.flink.streaming.api.functions.ProcessFunction.Context +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext +import org.apache.flink.util.Collector // the source data stream -DataStream<Tuple2<String, String>> stream = ...; +val stream: DataStream[Tuple2 [String, String] ] = ... // apply the process function onto a keyed stream -DataStream<Tuple2<String, Long>> result = stream .keyBy(0) .process(new CountWithTimeoutFunction()); +val result: DataStream[Tuple2 [String, Long] ] = stream + .keyBy(0) + .process(new CountWithTimeoutFunction()) /** * The data type stored in the state */ + * The data type stored in the state + */ case class CountWithTimestamp(key: String, count: Long, lastModified: Long) /** * The implementation of the ProcessFunction that maintains the count and timeouts */ -class TimeoutStateFunction extends ProcessFunction [(String, Long), (String, Long)] { + * The implementation of the ProcessFunction that maintains the count and timeouts + */ +class TimeoutStateFunction extends RichProcessFunction [(String, Long), (String, Long)] { /** The state that is maintained by this process function */ lazy val state: ValueState [CountWithTimestamp] = getRuntimeContext() .getState(new ValueStateDescriptor<>("myState", clasOf [CountWithTimestamp] )) + lazy val state: ValueState [CountWithTimestamp] = getRuntimeContext + .getState(new ValueStateDescriptor [CountWithTimestamp] ("myState", classOf [CountWithTimestamp] )) override def processElement(value: (String, Long), ctx: Context, out: Collector [(String, Long)] ): Unit = { — End diff – I think it should be value: (String, String)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3510#discussion_r105420011

          — Diff: docs/dev/stream/process_function.md —
          @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String,

          <div data-lang="scala" markdown="1">

          {% highlight scala %}

          -import org.apache.flink.api.common.state.ValueState;
          -import org.apache.flink.api.common.state.ValueStateDescriptor;
          -import org.apache.flink.streaming.api.functions.ProcessFunction;
          -import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
          -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
          -import org.apache.flink.util.Collector;
          +import org.apache.flink.api.common.state.ValueState
          +import org.apache.flink.api.common.state.ValueStateDescriptor
          +import org.apache.flink.streaming.api.functions.RichProcessFunction
          +import org.apache.flink.streaming.api.functions.ProcessFunction.Context
          +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
          +import org.apache.flink.util.Collector

          // the source data stream
          -DataStream<Tuple2<String, String>> stream = ...;
          +val stream: DataStream[Tuple2[String, String]] = ...

          // apply the process function onto a keyed stream
          -DataStream<Tuple2<String, Long>> result = stream

          • .keyBy(0)
          • .process(new CountWithTimeoutFunction());
            +val result: DataStream[Tuple2[String, Long]] = stream
            + .keyBy(0)
            + .process(new CountWithTimeoutFunction())

          /**

          • * The data type stored in the state
          • */
            + * The data type stored in the state
            + */
            case class CountWithTimestamp(key: String, count: Long, lastModified: Long)

          /**

          • * The implementation of the ProcessFunction that maintains the count and timeouts
          • */
            -class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] {
            + * The implementation of the ProcessFunction that maintains the count and timeouts
            + */
            +class TimeoutStateFunction extends RichProcessFunction[(String, Long), (String, Long)] {
              • End diff –

          And the first type for ProcessFunction should be (String, String)

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3510#discussion_r105420011 — Diff: docs/dev/stream/process_function.md — @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, <div data-lang="scala" markdown="1"> {% highlight scala %} -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.ProcessFunction.Context; -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext; -import org.apache.flink.util.Collector; +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.streaming.api.functions.RichProcessFunction +import org.apache.flink.streaming.api.functions.ProcessFunction.Context +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext +import org.apache.flink.util.Collector // the source data stream -DataStream<Tuple2<String, String>> stream = ...; +val stream: DataStream[Tuple2 [String, String] ] = ... // apply the process function onto a keyed stream -DataStream<Tuple2<String, Long>> result = stream .keyBy(0) .process(new CountWithTimeoutFunction()); +val result: DataStream[Tuple2 [String, Long] ] = stream + .keyBy(0) + .process(new CountWithTimeoutFunction()) /** * The data type stored in the state */ + * The data type stored in the state + */ case class CountWithTimestamp(key: String, count: Long, lastModified: Long) /** * The implementation of the ProcessFunction that maintains the count and timeouts */ -class TimeoutStateFunction extends ProcessFunction [(String, Long), (String, Long)] { + * The implementation of the ProcessFunction that maintains the count and timeouts + */ +class TimeoutStateFunction extends RichProcessFunction [(String, Long), (String, Long)] { End diff – And the first type for ProcessFunction should be (String, String)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3510#discussion_r105419777

          — Diff: docs/dev/stream/process_function.md —
          @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String,

          <div data-lang="scala" markdown="1">

          {% highlight scala %}

          -import org.apache.flink.api.common.state.ValueState;
          -import org.apache.flink.api.common.state.ValueStateDescriptor;
          -import org.apache.flink.streaming.api.functions.ProcessFunction;
          -import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
          -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
          -import org.apache.flink.util.Collector;
          +import org.apache.flink.api.common.state.ValueState
          +import org.apache.flink.api.common.state.ValueStateDescriptor
          +import org.apache.flink.streaming.api.functions.RichProcessFunction
          +import org.apache.flink.streaming.api.functions.ProcessFunction.Context
          +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
          +import org.apache.flink.util.Collector

          // the source data stream
          -DataStream<Tuple2<String, String>> stream = ...;
          +val stream: DataStream[Tuple2[String, String]] = ...

          // apply the process function onto a keyed stream
          -DataStream<Tuple2<String, Long>> result = stream

          • .keyBy(0)
          • .process(new CountWithTimeoutFunction());
            +val result: DataStream[Tuple2[String, Long]] = stream
            + .keyBy(0)
            + .process(new CountWithTimeoutFunction())

          /**

          • * The data type stored in the state
          • */
            + * The data type stored in the state
            + */
            case class CountWithTimestamp(key: String, count: Long, lastModified: Long)

          /**

          • * The implementation of the ProcessFunction that maintains the count and timeouts
          • */
            -class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] {
            + * The implementation of the ProcessFunction that maintains the count and timeouts
            + */
            +class TimeoutStateFunction extends RichProcessFunction[(String, Long), (String, Long)] {
              • End diff –

          The class name should change to CountWithTimeoutFunction to be consistency with java

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3510#discussion_r105419777 — Diff: docs/dev/stream/process_function.md — @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, <div data-lang="scala" markdown="1"> {% highlight scala %} -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.ProcessFunction.Context; -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext; -import org.apache.flink.util.Collector; +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.streaming.api.functions.RichProcessFunction +import org.apache.flink.streaming.api.functions.ProcessFunction.Context +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext +import org.apache.flink.util.Collector // the source data stream -DataStream<Tuple2<String, String>> stream = ...; +val stream: DataStream[Tuple2 [String, String] ] = ... // apply the process function onto a keyed stream -DataStream<Tuple2<String, Long>> result = stream .keyBy(0) .process(new CountWithTimeoutFunction()); +val result: DataStream[Tuple2 [String, Long] ] = stream + .keyBy(0) + .process(new CountWithTimeoutFunction()) /** * The data type stored in the state */ + * The data type stored in the state + */ case class CountWithTimestamp(key: String, count: Long, lastModified: Long) /** * The implementation of the ProcessFunction that maintains the count and timeouts */ -class TimeoutStateFunction extends ProcessFunction [(String, Long), (String, Long)] { + * The implementation of the ProcessFunction that maintains the count and timeouts + */ +class TimeoutStateFunction extends RichProcessFunction [(String, Long), (String, Long)] { End diff – The class name should change to CountWithTimeoutFunction to be consistency with java
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3510#discussion_r105418830

          — Diff: docs/dev/stream/process_function.md —
          @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String,

          <div data-lang="scala" markdown="1">

          {% highlight scala %}

          -import org.apache.flink.api.common.state.ValueState;
          -import org.apache.flink.api.common.state.ValueStateDescriptor;
          -import org.apache.flink.streaming.api.functions.ProcessFunction;
          -import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
          -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
          -import org.apache.flink.util.Collector;
          +import org.apache.flink.api.common.state.ValueState
          +import org.apache.flink.api.common.state.ValueStateDescriptor
          +import org.apache.flink.streaming.api.functions.RichProcessFunction
          +import org.apache.flink.streaming.api.functions.ProcessFunction.Context
          +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
          +import org.apache.flink.util.Collector

          // the source data stream
          -DataStream<Tuple2<String, String>> stream = ...;
          +val stream: DataStream[Tuple2[String, String]] = ...

          // apply the process function onto a keyed stream
          -DataStream<Tuple2<String, Long>> result = stream

          • .keyBy(0)
          • .process(new CountWithTimeoutFunction());
            +val result: DataStream[Tuple2[String, Long]] = stream
            + .keyBy(0)
            + .process(new CountWithTimeoutFunction())

          /**

          • * The data type stored in the state
          • */
            + * The data type stored in the state
            + */
            case class CountWithTimestamp(key: String, count: Long, lastModified: Long)

          /**

          • * The implementation of the ProcessFunction that maintains the count and timeouts
          • */
            -class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] {
            + * The implementation of the ProcessFunction that maintains the count and timeouts
            + */
            +class TimeoutStateFunction extends RichProcessFunction[(String, Long), (String, Long)] {

          /** The state that is maintained by this process function */

          • lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
          • .getState(new ValueStateDescriptor<>("myState", clasOf[CountWithTimestamp]))
            + lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
            + .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))

          override def processElement(value: (String, Long), ctx: Context, out: Collector[(String, Long)]): Unit = {
          // initialize or retrieve/update the state
          + val (key, _) = value
          — End diff –

          can we change the name here? since the IDE reports `suspicious shadowing by a variable pattern`. Or you can just use value._1 instead.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3510#discussion_r105418830 — Diff: docs/dev/stream/process_function.md — @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, <div data-lang="scala" markdown="1"> {% highlight scala %} -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.ProcessFunction.Context; -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext; -import org.apache.flink.util.Collector; +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.streaming.api.functions.RichProcessFunction +import org.apache.flink.streaming.api.functions.ProcessFunction.Context +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext +import org.apache.flink.util.Collector // the source data stream -DataStream<Tuple2<String, String>> stream = ...; +val stream: DataStream[Tuple2 [String, String] ] = ... // apply the process function onto a keyed stream -DataStream<Tuple2<String, Long>> result = stream .keyBy(0) .process(new CountWithTimeoutFunction()); +val result: DataStream[Tuple2 [String, Long] ] = stream + .keyBy(0) + .process(new CountWithTimeoutFunction()) /** * The data type stored in the state */ + * The data type stored in the state + */ case class CountWithTimestamp(key: String, count: Long, lastModified: Long) /** * The implementation of the ProcessFunction that maintains the count and timeouts */ -class TimeoutStateFunction extends ProcessFunction [(String, Long), (String, Long)] { + * The implementation of the ProcessFunction that maintains the count and timeouts + */ +class TimeoutStateFunction extends RichProcessFunction [(String, Long), (String, Long)] { /** The state that is maintained by this process function */ lazy val state: ValueState [CountWithTimestamp] = getRuntimeContext() .getState(new ValueStateDescriptor<>("myState", clasOf [CountWithTimestamp] )) + lazy val state: ValueState [CountWithTimestamp] = getRuntimeContext + .getState(new ValueStateDescriptor [CountWithTimestamp] ("myState", classOf [CountWithTimestamp] )) override def processElement(value: (String, Long), ctx: Context, out: Collector [(String, Long)] ): Unit = { // initialize or retrieve/update the state + val (key, _) = value — End diff – can we change the name here? since the IDE reports `suspicious shadowing by a variable pattern`. Or you can just use value._1 instead.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3510#discussion_r105417009

          — Diff: docs/dev/stream/process_function.md —
          @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String,

          <div data-lang="scala" markdown="1">

          {% highlight scala %}

          -import org.apache.flink.api.common.state.ValueState;
          -import org.apache.flink.api.common.state.ValueStateDescriptor;
          -import org.apache.flink.streaming.api.functions.ProcessFunction;
          -import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
          -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
          -import org.apache.flink.util.Collector;
          +import org.apache.flink.api.common.state.ValueState
          +import org.apache.flink.api.common.state.ValueStateDescriptor
          +import org.apache.flink.streaming.api.functions.RichProcessFunction
          — End diff –

          RichProcessFunction no longer exists, can you change it back to ProcessFunction? And there are some `RichProcessFunction`s in the java snippet, can you try to fix that too?

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3510#discussion_r105417009 — Diff: docs/dev/stream/process_function.md — @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, <div data-lang="scala" markdown="1"> {% highlight scala %} -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.ProcessFunction.Context; -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext; -import org.apache.flink.util.Collector; +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.streaming.api.functions.RichProcessFunction — End diff – RichProcessFunction no longer exists, can you change it back to ProcessFunction? And there are some `RichProcessFunction`s in the java snippet, can you try to fix that too?
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user maocorte opened a pull request:

          https://github.com/apache/flink/pull/3510

          FLINK-6023 Fix Scala snippet into Process Function Doc

          The pull request references the related JIRA issue "FLINK-6023 Fix Scala snippet into Process Function (Low-level Operations) Doc".
          The scope is to fix the java and scala examples.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/radicalbit/flink FLINK-6023

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3510.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3510



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user maocorte opened a pull request: https://github.com/apache/flink/pull/3510 FLINK-6023 Fix Scala snippet into Process Function Doc The pull request references the related JIRA issue " FLINK-6023 Fix Scala snippet into Process Function (Low-level Operations) Doc". The scope is to fix the java and scala examples. You can merge this pull request into a Git repository by running: $ git pull https://github.com/radicalbit/flink FLINK-6023 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3510.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3510

            People

            • Assignee:
              mao.corte@gmail.com Mauro Cortellazzi
              Reporter:
              mao.corte@gmail.com Mauro Cortellazzi
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development