Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6517

Support multiple consecutive windows

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      FLINK-5884 changed the way how windows can be defined, however, it is not possible to define multiple consecutive windows right now. It should be possible to refine the end property of a window as a new time attribute.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user twalthr opened a pull request:

          https://github.com/apache/flink/pull/3897

          FLINK-6517 [table] Support multiple consecutive windows

          This PR adds support for multiple consecutive windows for the Table API. SQL requires additional group auxiliary functions. I will open a followup issue for that.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/twalthr/flink FLINK-6517

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3897.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3897


          commit faa2646e73039f72f9849efe8eb36f0f2abb793a
          Author: twalthr <twalthr@apache.org>
          Date: 2017-05-12T08:03:25Z

          FLINK-6517 [table] Support multiple consecutive windows


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/3897 FLINK-6517 [table] Support multiple consecutive windows This PR adds support for multiple consecutive windows for the Table API. SQL requires additional group auxiliary functions. I will open a followup issue for that. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-6517 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3897.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3897 commit faa2646e73039f72f9849efe8eb36f0f2abb793a Author: twalthr <twalthr@apache.org> Date: 2017-05-12T08:03:25Z FLINK-6517 [table] Support multiple consecutive windows
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3897#discussion_r116436281

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala —
          @@ -28,7 +28,7 @@ import org.apache.flink.table.functions.TableFunction
          import org.apache.flink.table.plan.logical.TumblingGroupWindow
          import org.apache.flink.table.utils.TableTestBase
          import org.apache.flink.table.utils.TableTestUtil._
          -import org.junit.Test
          +import org.junit.

          {Ignore, Test}

          — End diff –

          `Ignore` is unused

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3897#discussion_r116436281 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala — @@ -28,7 +28,7 @@ import org.apache.flink.table.functions.TableFunction import org.apache.flink.table.plan.logical.TumblingGroupWindow import org.apache.flink.table.utils.TableTestBase import org.apache.flink.table.utils.TableTestUtil._ -import org.junit.Test +import org.junit. {Ignore, Test} — End diff – `Ignore` is unused
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3897#discussion_r116433771

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala —
          @@ -39,20 +49,20 @@ abstract class WindowProperty(child: Expression) extends UnaryExpression

          { ValidationFailure("Child must be a window reference.") }
          • private[flink] def toNamedWindowProperty(name: String)(implicit relBuilder: RelBuilder)
            + def toNamedWindowProperty(name: String)
              • End diff –

          define in one line?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3897#discussion_r116433771 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala — @@ -39,20 +49,20 @@ abstract class WindowProperty(child: Expression) extends UnaryExpression { ValidationFailure("Child must be a window reference.") } private [flink] def toNamedWindowProperty(name: String)(implicit relBuilder: RelBuilder) + def toNamedWindowProperty(name: String) End diff – define in one line?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3897#discussion_r116436437

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala —
          @@ -28,13 +28,13 @@ import org.apache.flink.streaming.api.watermark.Watermark
          import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
          import org.apache.flink.table.api.scala._
          import org.apache.flink.table.api.scala.stream.utils.StreamITCase
          -import org.apache.flink.table.api.

          {TableEnvironment, TableException, Types}

          +import org.apache.flink.table.api.

          {TableEnvironment, TableException, Types, ValidationException}

          import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc
          import org.apache.flink.table.expressions.TimeIntervalUnit
          import org.apache.flink.table.runtime.datastream.TimeAttributesITCase.TimestampWithEqualWatermark
          import org.apache.flink.types.Row
          import org.junit.Assert._
          -import org.junit.Test
          +import org.junit.

          {Ignore, Test}

          — End diff –

          `Ignore` is unused

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3897#discussion_r116436437 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala — @@ -28,13 +28,13 @@ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.scala.stream.utils.StreamITCase -import org.apache.flink.table.api. {TableEnvironment, TableException, Types} +import org.apache.flink.table.api. {TableEnvironment, TableException, Types, ValidationException} import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc import org.apache.flink.table.expressions.TimeIntervalUnit import org.apache.flink.table.runtime.datastream.TimeAttributesITCase.TimestampWithEqualWatermark import org.apache.flink.types.Row import org.junit.Assert._ -import org.junit.Test +import org.junit. {Ignore, Test} — End diff – `Ignore` is unused
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3897#discussion_r116433350

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala —
          @@ -132,31 +134,61 @@ case class WindowReference(name: String) extends Attribute

          { throw new ValidationException("Cannot rename window reference.") }

          }
          +
          + override def toString: String = s"'$name"
          }

          abstract class TimeAttribute(val expression: Expression)
          extends UnaryExpression

          • with NamedExpression {
            + with WindowProperty {

          override private[flink] def child: Expression = expression
          -

          • override private[flink] def name: String = expression match { - case UnresolvedFieldReference(name) => name - case _ => throw new ValidationException("Unresolved field reference expected.") - }

            -

          • override private[flink] def toAttribute: Attribute =
          • throw new UnsupportedOperationException("Time attribute can not be used solely.")
            }

          case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) {

          • override private[flink] def resultType: TypeInformation[_] =
            + override private[flink] def validateInput(): ValidationResult = {
            + child match {
            + case WindowReference(_, Some(tpe)) if !isRowtimeIndicatorType(tpe) =>
            + ValidationFailure("A proctime window can not guarantee a rowtime attribute.")
              • End diff –

          -> "A proctime window cannot provide a rowtime attribute"?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3897#discussion_r116433350 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala — @@ -132,31 +134,61 @@ case class WindowReference(name: String) extends Attribute { throw new ValidationException("Cannot rename window reference.") } } + + override def toString: String = s"'$name" } abstract class TimeAttribute(val expression: Expression) extends UnaryExpression with NamedExpression { + with WindowProperty { override private [flink] def child: Expression = expression - override private [flink] def name: String = expression match { - case UnresolvedFieldReference(name) => name - case _ => throw new ValidationException("Unresolved field reference expected.") - } - override private [flink] def toAttribute: Attribute = throw new UnsupportedOperationException("Time attribute can not be used solely.") } case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) { override private [flink] def resultType: TypeInformation [_] = + override private [flink] def validateInput(): ValidationResult = { + child match { + case WindowReference(_, Some(tpe)) if !isRowtimeIndicatorType(tpe) => + ValidationFailure("A proctime window can not guarantee a rowtime attribute.") End diff – -> "A proctime window cannot provide a rowtime attribute"?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3897

          Thanks for the review @fhueske. I will merge this...

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3897 Thanks for the review @fhueske. I will merge this...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3897

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3897
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.4: c86f46cdc1c0e2be30876ef358e7ab3c1daa14e9
          Fixed in 1.3: e23328e4ad4a370af94b1a7441dfcf356eda62f5

          Show
          twalthr Timo Walther added a comment - Fixed in 1.4: c86f46cdc1c0e2be30876ef358e7ab3c1daa14e9 Fixed in 1.3: e23328e4ad4a370af94b1a7441dfcf356eda62f5

            People

            • Assignee:
              twalthr Timo Walther
              Reporter:
              twalthr Timo Walther
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development