Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6602

Table source with defined time attributes allows empty string

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.1, 1.4.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      DefinedRowtimeAttribute and DefinedProctimeAttribute are not checked for empty strings.

        Activity

        Hide
        lmalds Zhe Li added a comment -

        Checking the rowtime and proctime are not allowed empty string in StreamTableEnvironment#validateAndExtractTimeAttributes when called registerDataStreamInternal method, such as below code (added on line 485):

        // rowtime and proctime are not allowed empty string
        if (rowtime.isDefined && rowtime.get._2.trim == "")

        { throw new TableException( "The rowtime attribute are not allowed empty string.") }

        if (proctime.isDefined && rowtime.get._2.trim == "")

        { throw new TableException( "The proctime attribute are not allowed empty string.") }

        Is it right?

        Show
        lmalds Zhe Li added a comment - Checking the rowtime and proctime are not allowed empty string in StreamTableEnvironment#validateAndExtractTimeAttributes when called registerDataStreamInternal method, such as below code (added on line 485): // rowtime and proctime are not allowed empty string if (rowtime.isDefined && rowtime.get._2.trim == "") { throw new TableException( "The rowtime attribute are not allowed empty string.") } if (proctime.isDefined && rowtime.get._2.trim == "") { throw new TableException( "The proctime attribute are not allowed empty string.") } Is it right?
        Hide
        twalthr Timo Walther added a comment -

        Yes, I would also check for null and change the message to "The name of the XXXX attribute must not be empty.".

        Show
        twalthr Timo Walther added a comment - Yes, I would also check for null and change the message to "The name of the XXXX attribute must not be empty.".
        Hide
        fhueske Fabian Hueske added a comment -

        I think we should allow null to indicate that no time attribute should be added.
        Otherwise we need TableSource implementations for all combinations of none, event-time, processing-time, and both.

        Show
        fhueske Fabian Hueske added a comment - I think we should allow null to indicate that no time attribute should be added. Otherwise we need TableSource implementations for all combinations of none, event-time, processing-time, and both.
        Hide
        lmalds Zhe Li added a comment -

        Timo & Fabian,

        I have 2 problems to confirm.

        (1) I think this problem only appeared when called StreamTableSourceTable#getRowType method, but not StreamTableEnvironment#validateAndExtractTimeAttributes, so we should check in the method of getRowType, line 42 and 50, is it right?

        (2) When check timeSource.getRowtimeAttribute or timeSource.getProctimeAttribute is null or not, (line 42 and 50), we should add codes to check whether 'XXXXAttribute' is empty string, if yes, we can treat it as None. Such as following codes:

        //case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null =>
        case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null && !timeSource.getRowtimeAttribute.trim.equals("") =>
        ...
        //case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null =>
        case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null && !timeSource.getProctimeAttribute.trim.equals("") =>

        is it right?

        Thanks.

        Show
        lmalds Zhe Li added a comment - Timo & Fabian, I have 2 problems to confirm. (1) I think this problem only appeared when called StreamTableSourceTable#getRowType method, but not StreamTableEnvironment#validateAndExtractTimeAttributes, so we should check in the method of getRowType, line 42 and 50, is it right? (2) When check timeSource.getRowtimeAttribute or timeSource.getProctimeAttribute is null or not, (line 42 and 50), we should add codes to check whether 'XXXXAttribute' is empty string, if yes, we can treat it as None. Such as following codes: //case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null => case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null && !timeSource.getRowtimeAttribute.trim.equals("") => ... //case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null => case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null && !timeSource.getProctimeAttribute.trim.equals("") => is it right? Thanks.
        Hide
        fhueske Fabian Hueske added a comment -

        Hi Zhe Li,

        yes, we need to fix this issue in StreamTableSourceTable and not in StreamTableEnvironment.
        However, I don't think we should treat an empty string the same way as null. Instead I would throw a TableException if the string is empty.

        Show
        fhueske Fabian Hueske added a comment - Hi Zhe Li , yes, we need to fix this issue in StreamTableSourceTable and not in StreamTableEnvironment . However, I don't think we should treat an empty string the same way as null . Instead I would throw a TableException if the string is empty.
        Hide
        lmalds Zhe Li added a comment -

        Fabian,

        I understand what you mean.

        So I modified the codes in StreamTableSourceTable#getRowType to:

          val rowtime = tableSource match {
              case emptyStringTimeSource : DefinedRowtimeAttribute if emptyStringTimeSource.getRowtimeAttribute.trim.equals("")  =>
                throw TableException("The name of the rowtime attribute must not be empty.")
        
              case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null  =>
                val rowtimeAttribute = timeSource.getRowtimeAttribute
                Some((fieldCnt, rowtimeAttribute))
              case _ =>
                None
            }
        
            val proctime = tableSource match {
              case emptyStringTimeSource : DefinedProctimeAttribute if emptyStringTimeSource.getProctimeAttribute.trim.equals("")  =>
                throw TableException("The name of the proctime attribute must not be empty.")
        
              case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null  =>
                val proctimeAttribute = timeSource.getProctimeAttribute
                Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
              case _ =>
                None
            }
        

        If there is nothing problem, could I fix the issue?

        Below is my Jira profile:

        Username: lmalds
        Full Name: Zhe Li

        please assign me the update privilege.

        Thanks.

        Show
        lmalds Zhe Li added a comment - Fabian, I understand what you mean. So I modified the codes in StreamTableSourceTable#getRowType to: val rowtime = tableSource match { case emptyStringTimeSource : DefinedRowtimeAttribute if emptyStringTimeSource.getRowtimeAttribute.trim.equals("") => throw TableException( "The name of the rowtime attribute must not be empty." ) case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null => val rowtimeAttribute = timeSource.getRowtimeAttribute Some((fieldCnt, rowtimeAttribute)) case _ => None } val proctime = tableSource match { case emptyStringTimeSource : DefinedProctimeAttribute if emptyStringTimeSource.getProctimeAttribute.trim.equals("") => throw TableException( "The name of the proctime attribute must not be empty." ) case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null => val proctimeAttribute = timeSource.getProctimeAttribute Some((fieldCnt + ( if (rowtime.isDefined) 1 else 0), proctimeAttribute)) case _ => None } If there is nothing problem, could I fix the issue? Below is my Jira profile: Username: lmalds Full Name: Zhe Li please assign me the update privilege. Thanks.
        Hide
        fhueske Fabian Hueske added a comment -

        The fix looks good! Please open a pull request with that

        I also gave you contributor permissions for the Flink JIRA.
        You can now also assign other issues to yourself.

        Thank you, Fabian

        Show
        fhueske Fabian Hueske added a comment - The fix looks good! Please open a pull request with that I also gave you contributor permissions for the Flink JIRA. You can now also assign other issues to yourself. Thank you, Fabian
        Hide
        lmalds Zhe Li added a comment -

        It's my pleasure

        I have opened a pull request just now.

        Thank you.

        Show
        lmalds Zhe Li added a comment - It's my pleasure I have opened a pull request just now. Thank you.
        Hide
        fhueske Fabian Hueske added a comment -

        Fixed for 1.3.1 with c7f6d0245cd0d47a99d9badcd43f1ebb8758125e
        Fixed for 1.4.0 with 850e4d913bf33d90409a078dab2fbc26bfa976ce

        Show
        fhueske Fabian Hueske added a comment - Fixed for 1.3.1 with c7f6d0245cd0d47a99d9badcd43f1ebb8758125e Fixed for 1.4.0 with 850e4d913bf33d90409a078dab2fbc26bfa976ce

          People

          • Assignee:
            lmalds Zhe Li
            Reporter:
            twalthr Timo Walther
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development