Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-31326

Disabled source scaling breaks downstream scaling if source busyTimeMsPerSecond is 0

    XMLWordPrintableJSON

Details

    • Handle new-style and old-style sources equally well and remove option to disable scaling sources

    Description

      In case of 'scaling.sources.enabled'='false' the 'TARGET_DATA_RATE' of the source vertex will be calculated as '(1000 / busyTimeMsPerSecond) * numRecordsOutPerSecond' which currently on the main branch results in an infinite value if 'busyTimeMsPerSecond' is 0. This will also affect downstream operators.

      I'm not that familiar with the autoscaler code, but it's in my opinion it's quite unexpected to have such behavioral changes by setting 'scaling.sources.enabled' to false.

       

      With PR #543 for FLINK-30575 (https://github.com/apache/flink-kubernetes-operator/pull/543) scaling will happen even with 'busyTimeMsPerSecond' being 0, but it will result in unreasonably high parallelism numbers for downstream operators because 'TARGET_DATA_RATE' will be very high where 0 'busyTimeMsPerSecond' will be replaced with 1e-10.

      Metrics from the operator logs (source=e5a72f353fc1e6bbf3bd96a41384998c, sink=51312116a3e504bccb3874fc80d5055e)

      'scaling.sources.enabled'='true':

       jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.PARALLELISM.Current: 1.0
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.MAX_PARALLELISM.Current: 1.0
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_PROCESSING_RATE.Current: NaN
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_PROCESSING_RATE.Average: NaN
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.CATCH_UP_DATA_RATE.Current: 0.0
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.SCALE_UP_RATE_THRESHOLD.Current: 5.0
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.SCALE_DOWN_RATE_THRESHOLD.Current: 10.0
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.OUTPUT_RATIO.Current: 2.0
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.OUTPUT_RATIO.Average: 2.0
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_OUTPUT_RATE.Current: Infinity
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_OUTPUT_RATE.Average: NaN
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TARGET_DATA_RATE.Current: 3.8666666666666667
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TARGET_DATA_RATE.Average: 3.8833333333333333
      
      jobVertexID.51312116a3e504bccb3874fc80d5055e.PARALLELISM.Current: 4.0
      jobVertexID.51312116a3e504bccb3874fc80d5055e.MAX_PARALLELISM.Current: 12.0
      jobVertexID.51312116a3e504bccb3874fc80d5055e.TRUE_PROCESSING_RATE.Current: 4.827299209321681
      jobVertexID.51312116a3e504bccb3874fc80d5055e.TRUE_PROCESSING_RATE.Average: 4.848351269098938
      jobVertexID.51312116a3e504bccb3874fc80d5055e.CATCH_UP_DATA_RATE.Current: 0.0
      jobVertexID.51312116a3e504bccb3874fc80d5055e.SCALE_UP_RATE_THRESHOLD.Current: 10.0
      jobVertexID.51312116a3e504bccb3874fc80d5055e.SCALE_DOWN_RATE_THRESHOLD.Current: 21.0
      jobVertexID.51312116a3e504bccb3874fc80d5055e.TARGET_DATA_RATE.Current: 7.733333333333333
      jobVertexID.51312116a3e504bccb3874fc80d5055e.TARGET_DATA_RATE.Average: 7.766666666666667

      'scaling.sources.enabled'='false':

       jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.PARALLELISM.Current: 1.0
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.MAX_PARALLELISM.Current: 1.0
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_PROCESSING_RATE.Current: NaN
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_PROCESSING_RATE.Average: NaN
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.CATCH_UP_DATA_RATE.Current: 0.0
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.SCALE_UP_RATE_THRESHOLD.Current: NaN
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.SCALE_DOWN_RATE_THRESHOLD.Current: NaN
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.OUTPUT_RATIO.Current: 2.0
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.OUTPUT_RATIO.Average: 2.0
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_OUTPUT_RATE.Current: Infinity
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TRUE_OUTPUT_RATE.Average: NaN
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TARGET_DATA_RATE.Current: Infinity
      jobVertexID.e5a72f353fc1e6bbf3bd96a41384998c.TARGET_DATA_RATE.Average: NaN
      
      jobVertexID.51312116a3e504bccb3874fc80d5055e.PARALLELISM.Current: 4.0
      jobVertexID.51312116a3e504bccb3874fc80d5055e.MAX_PARALLELISM.Current: 12.0
      jobVertexID.51312116a3e504bccb3874fc80d5055e.TRUE_PROCESSING_RATE.Current: 5.0
      jobVertexID.51312116a3e504bccb3874fc80d5055e.TRUE_PROCESSING_RATE.Average: 4.980555555555556
      jobVertexID.51312116a3e504bccb3874fc80d5055e.CATCH_UP_DATA_RATE.Current: 0.0
      jobVertexID.51312116a3e504bccb3874fc80d5055e.SCALE_UP_RATE_THRESHOLD.Current: NaN
      jobVertexID.51312116a3e504bccb3874fc80d5055e.SCALE_DOWN_RATE_THRESHOLD.Current: NaN
      jobVertexID.51312116a3e504bccb3874fc80d5055e.TARGET_DATA_RATE.Current: Infinity
      jobVertexID.51312116a3e504bccb3874fc80d5055e.TARGET_DATA_RATE.Average: NaN

       

      My guess is 'scaling.sources.enabled' exists to support connectors where `pendingRecords` is not available, but setting this to false also negatively impacts existing Kafka sources for example, and users cannot anticipate this from the documentation.

       

      I think it would be worth it to include this in the docs, or if anyone has any suggested solutions I would gladly look into implementing it.

      Attachments

        Issue Links

          Activity

            People

              mxm Maximilian Michels
              mateczagany Mate Czagany
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: