Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      CALCITE-1603 and CALCITE-1615 introduces the support of the TUMBLE / HOP / SESSION windows in the parser.

      This jira tracks the efforts of adding the corresponding supports on the planners / optimizers in Flink.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user haohui opened a pull request:

          https://github.com/apache/flink/pull/3665

          FLINK-6011 Support TUMBLE, HOP, SESSION window in streaming SQL.

          This PR adds supports for the `TUMBLE`, `HOP`, and `SESSION` windows in Flink.

          The work of supporting WindowStart and WindowEnd expressions will be deferred to FLINK-6012.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/haohui/flink FLINK-6011

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3665.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3665


          commit ef54aea409ea95feb04e651a0fde2fd4b620d888
          Author: Haohui Mai <wheat9@apache.org>
          Date: 2017-04-03T21:32:09Z

          FLINK-6011 Support TUMBLE, HOP, SESSION window in streaming SQL.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user haohui opened a pull request: https://github.com/apache/flink/pull/3665 FLINK-6011 Support TUMBLE, HOP, SESSION window in streaming SQL. This PR adds supports for the `TUMBLE`, `HOP`, and `SESSION` windows in Flink. The work of supporting WindowStart and WindowEnd expressions will be deferred to FLINK-6012 . You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-6011 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3665.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3665 commit ef54aea409ea95feb04e651a0fde2fd4b620d888 Author: Haohui Mai <wheat9@apache.org> Date: 2017-04-03T21:32:09Z FLINK-6011 Support TUMBLE, HOP, SESSION window in streaming SQL.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3665

          Thanks for the PR @haohui!
          +1 to merge.
          I've been waiting for this feature

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3665 Thanks for the PR @haohui! +1 to merge. I've been waiting for this feature
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on the issue:

          https://github.com/apache/flink/pull/3665

          So we finally got those supported by Calcite 1.12´╝čReally excited to see those features supported in flinkSQL. Thanks @haohui.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3665 So we finally got those supported by Calcite 1.12´╝čReally excited to see those features supported in flinkSQL. Thanks @haohui.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3665#discussion_r109615593

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala —
          @@ -350,7 +350,16 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
          SqlStdOperatorTable.EXISTS,
          // EXTENSIONS
          EventTimeExtractor,

          • ProcTimeExtractor
            + ProcTimeExtractor,
            + SqlStdOperatorTable.TUMBLE,
            + SqlStdOperatorTable.TUMBLE_START,
              • End diff –

          Do we already support the `START/END` functions? We should let them unsupported until they are implemented and tested.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3665#discussion_r109615593 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala — @@ -350,7 +350,16 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { SqlStdOperatorTable.EXISTS, // EXTENSIONS EventTimeExtractor, ProcTimeExtractor + ProcTimeExtractor, + SqlStdOperatorTable.TUMBLE, + SqlStdOperatorTable.TUMBLE_START, End diff – Do we already support the `START/END` functions? We should let them unsupported until they are implemented and tested.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3665#discussion_r109616209

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala —
          @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule
          }

          private def identifyWindow(field: RexNode): Option[Window] = {

          • // Detects window expressions by pattern matching
          • // supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx),
          • // with time being equal to proctime() or rowtime()
            field match {
            case call: RexCall =>
            call.getOperator match {
          • case _: SqlFloorFunction =>
          • val operand = call.getOperands.get(1).asInstanceOf[RexLiteral]
          • val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange]
          • val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit)
          • call.getType match { - case TimeModeTypes.PROCTIME => - return Some(w) - case TimeModeTypes.ROWTIME => - return Some(w.on("rowtime")) - case _ => - }
          • case _ =>
            + case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow
            + case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow
            + case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow
            + case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow
            + case _ => None
            }
          • case _ =>
            + case _ => None
            }
          • None
            }
            -
            }

          -object LogicalWindowAggregateRule {
          +private abstract class WindowTranslator {
          + val call: RexCall

          • private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate],
          • RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
            + protected def unwrapLiteral[T](node: RexNode): T =
            + node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
          • private[flink] val INSTANCE = new LogicalWindowAggregateRule
            + protected def getOperandAsLong(idx: Int): Long =
            + unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue()
              • End diff –

          Does Calcite ensure that operands can only be literals, no input reference?

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3665#discussion_r109616209 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala — @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule } private def identifyWindow(field: RexNode): Option [Window] = { // Detects window expressions by pattern matching // supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx), // with time being equal to proctime() or rowtime() field match { case call: RexCall => call.getOperator match { case _: SqlFloorFunction => val operand = call.getOperands.get(1).asInstanceOf [RexLiteral] val unit: TimeUnitRange = operand.getValue.asInstanceOf [TimeUnitRange] val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit) call.getType match { - case TimeModeTypes.PROCTIME => - return Some(w) - case TimeModeTypes.ROWTIME => - return Some(w.on("rowtime")) - case _ => - } case _ => + case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow + case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow + case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow + case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow + case _ => None } case _ => + case _ => None } None } - } -object LogicalWindowAggregateRule { +private abstract class WindowTranslator { + val call: RexCall private [flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf [LogicalAggregate] , RelOptRule.operand(classOf [LogicalProject] , RelOptRule.none())) + protected def unwrapLiteral [T] (node: RexNode): T = + node.asInstanceOf [RexLiteral] .getValue.asInstanceOf [T] private [flink] val INSTANCE = new LogicalWindowAggregateRule + protected def getOperandAsLong(idx: Int): Long = + unwrapLiteral [BigDecimal] (call.getOperands.get(idx)).longValue() End diff – Does Calcite ensure that operands can only be literals, no input reference?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3665#discussion_r109810006

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala —
          @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule
          }

          private def identifyWindow(field: RexNode): Option[Window] = {

          • // Detects window expressions by pattern matching
          • // supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx),
          • // with time being equal to proctime() or rowtime()
            field match {
            case call: RexCall =>
            call.getOperator match {
          • case _: SqlFloorFunction =>
          • val operand = call.getOperands.get(1).asInstanceOf[RexLiteral]
          • val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange]
          • val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit)
          • call.getType match { - case TimeModeTypes.PROCTIME => - return Some(w) - case TimeModeTypes.ROWTIME => - return Some(w.on("rowtime")) - case _ => - }
          • case _ =>
            + case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow
            + case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow
            + case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow
            + case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow
            + case _ => None
            }
          • case _ =>
            + case _ => None
            }
          • None
            }
            -
            }

          -object LogicalWindowAggregateRule {
          +private abstract class WindowTranslator {
          + val call: RexCall

          • private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate],
          • RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
            + protected def unwrapLiteral[T](node: RexNode): T =
            + node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
          • private[flink] val INSTANCE = new LogicalWindowAggregateRule
            + protected def getOperandAsLong(idx: Int): Long =
            + unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue()
              • End diff –

          I just tried out Calcite did not stop you from passing something like a RexCall. So yes, it can be dynamic.

          One question: whether Flink actually supports `GroupWindow` that has a dynamic size? Maybe I'm wrong but it does not seem so.

          If the answer is no maybe we should check that whether it is a `RexLiteral`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3665#discussion_r109810006 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala — @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule } private def identifyWindow(field: RexNode): Option [Window] = { // Detects window expressions by pattern matching // supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx), // with time being equal to proctime() or rowtime() field match { case call: RexCall => call.getOperator match { case _: SqlFloorFunction => val operand = call.getOperands.get(1).asInstanceOf [RexLiteral] val unit: TimeUnitRange = operand.getValue.asInstanceOf [TimeUnitRange] val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit) call.getType match { - case TimeModeTypes.PROCTIME => - return Some(w) - case TimeModeTypes.ROWTIME => - return Some(w.on("rowtime")) - case _ => - } case _ => + case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow + case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow + case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow + case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow + case _ => None } case _ => + case _ => None } None } - } -object LogicalWindowAggregateRule { +private abstract class WindowTranslator { + val call: RexCall private [flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf [LogicalAggregate] , RelOptRule.operand(classOf [LogicalProject] , RelOptRule.none())) + protected def unwrapLiteral [T] (node: RexNode): T = + node.asInstanceOf [RexLiteral] .getValue.asInstanceOf [T] private [flink] val INSTANCE = new LogicalWindowAggregateRule + protected def getOperandAsLong(idx: Int): Long = + unwrapLiteral [BigDecimal] (call.getOperands.get(idx)).longValue() End diff – I just tried out Calcite did not stop you from passing something like a RexCall . So yes, it can be dynamic. One question: whether Flink actually supports `GroupWindow` that has a dynamic size? Maybe I'm wrong but it does not seem so. If the answer is no maybe we should check that whether it is a `RexLiteral`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3665#discussion_r109856293

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala —
          @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule
          }

          private def identifyWindow(field: RexNode): Option[Window] = {

          • // Detects window expressions by pattern matching
          • // supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx),
          • // with time being equal to proctime() or rowtime()
            field match {
            case call: RexCall =>
            call.getOperator match {
          • case _: SqlFloorFunction =>
          • val operand = call.getOperands.get(1).asInstanceOf[RexLiteral]
          • val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange]
          • val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit)
          • call.getType match { - case TimeModeTypes.PROCTIME => - return Some(w) - case TimeModeTypes.ROWTIME => - return Some(w.on("rowtime")) - case _ => - }
          • case _ =>
            + case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow
            + case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow
            + case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow
            + case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow
            + case _ => None
            }
          • case _ =>
            + case _ => None
            }
          • None
            }
            -
            }

          -object LogicalWindowAggregateRule {
          +private abstract class WindowTranslator {
          + val call: RexCall

          • private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate],
          • RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
            + protected def unwrapLiteral[T](node: RexNode): T =
            + node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
          • private[flink] val INSTANCE = new LogicalWindowAggregateRule
            + protected def getOperandAsLong(idx: Int): Long =
            + unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue()
              • End diff –

          Flink does only support windows with fixed configuration (SESSION windows have variable length, but the gap parameter is fixed). I'm also not sure if that would make sense. It's quite hard to reason about the behavior of a window with variable parameters, IMO.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3665#discussion_r109856293 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala — @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule } private def identifyWindow(field: RexNode): Option [Window] = { // Detects window expressions by pattern matching // supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx), // with time being equal to proctime() or rowtime() field match { case call: RexCall => call.getOperator match { case _: SqlFloorFunction => val operand = call.getOperands.get(1).asInstanceOf [RexLiteral] val unit: TimeUnitRange = operand.getValue.asInstanceOf [TimeUnitRange] val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit) call.getType match { - case TimeModeTypes.PROCTIME => - return Some(w) - case TimeModeTypes.ROWTIME => - return Some(w.on("rowtime")) - case _ => - } case _ => + case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow + case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow + case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow + case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow + case _ => None } case _ => + case _ => None } None } - } -object LogicalWindowAggregateRule { +private abstract class WindowTranslator { + val call: RexCall private [flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf [LogicalAggregate] , RelOptRule.operand(classOf [LogicalProject] , RelOptRule.none())) + protected def unwrapLiteral [T] (node: RexNode): T = + node.asInstanceOf [RexLiteral] .getValue.asInstanceOf [T] private [flink] val INSTANCE = new LogicalWindowAggregateRule + protected def getOperandAsLong(idx: Int): Long = + unwrapLiteral [BigDecimal] (call.getOperands.get(idx)).longValue() End diff – Flink does only support windows with fixed configuration (SESSION windows have variable length, but the gap parameter is fixed). I'm also not sure if that would make sense. It's quite hard to reason about the behavior of a window with variable parameters, IMO.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user haohui commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3665#discussion_r109857875

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala —
          @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule
          }

          private def identifyWindow(field: RexNode): Option[Window] = {

          • // Detects window expressions by pattern matching
          • // supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx),
          • // with time being equal to proctime() or rowtime()
            field match {
            case call: RexCall =>
            call.getOperator match {
          • case _: SqlFloorFunction =>
          • val operand = call.getOperands.get(1).asInstanceOf[RexLiteral]
          • val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange]
          • val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit)
          • call.getType match { - case TimeModeTypes.PROCTIME => - return Some(w) - case TimeModeTypes.ROWTIME => - return Some(w.on("rowtime")) - case _ => - }
          • case _ =>
            + case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow
            + case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow
            + case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow
            + case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow
            + case _ => None
            }
          • case _ =>
            + case _ => None
            }
          • None
            }
            -
            }

          -object LogicalWindowAggregateRule {
          +private abstract class WindowTranslator {
          + val call: RexCall

          • private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate],
          • RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
            + protected def unwrapLiteral[T](node: RexNode): T =
            + node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
          • private[flink] val INSTANCE = new LogicalWindowAggregateRule
            + protected def getOperandAsLong(idx: Int): Long =
            + unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue()
              • End diff –

          Agree. 97d1a45 will throw an `TableException` if the configuration is not fixed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3665#discussion_r109857875 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala — @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule } private def identifyWindow(field: RexNode): Option [Window] = { // Detects window expressions by pattern matching // supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx), // with time being equal to proctime() or rowtime() field match { case call: RexCall => call.getOperator match { case _: SqlFloorFunction => val operand = call.getOperands.get(1).asInstanceOf [RexLiteral] val unit: TimeUnitRange = operand.getValue.asInstanceOf [TimeUnitRange] val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit) call.getType match { - case TimeModeTypes.PROCTIME => - return Some(w) - case TimeModeTypes.ROWTIME => - return Some(w.on("rowtime")) - case _ => - } case _ => + case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow + case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow + case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow + case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow + case _ => None } case _ => + case _ => None } None } - } -object LogicalWindowAggregateRule { +private abstract class WindowTranslator { + val call: RexCall private [flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf [LogicalAggregate] , RelOptRule.operand(classOf [LogicalProject] , RelOptRule.none())) + protected def unwrapLiteral [T] (node: RexNode): T = + node.asInstanceOf [RexLiteral] .getValue.asInstanceOf [T] private [flink] val INSTANCE = new LogicalWindowAggregateRule + protected def getOperandAsLong(idx: Int): Long = + unwrapLiteral [BigDecimal] (call.getOperands.get(idx)).longValue() End diff – Agree. 97d1a45 will throw an `TableException` if the configuration is not fixed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3665

          Thanks for the update @haohui!
          Will merge this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3665 Thanks for the update @haohui! Will merge this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3665

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3665
          Hide
          fhueske Fabian Hueske added a comment -

          Implemented with 876107698aa2d601d5f398ea56fd93eeb03b117a

          Show
          fhueske Fabian Hueske added a comment - Implemented with 876107698aa2d601d5f398ea56fd93eeb03b117a

            People

            • Assignee:
              wheat9 Haohui Mai
              Reporter:
              wheat9 Haohui Mai
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development