Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34991

Flink Operator ClassPath Race Condition Bug

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 1.7.2
    • None
    • Kubernetes Operator
    • None
    • Standard Flink Operator with Flink Deployment.

      To recreate, just remove a critical SQL connector library from the bundled jar

    Description

      Hello,

      I believe we've found a bug with the Job Managers of the Kubernetes Operator. I think there is a race condition or an incorrect conditional where the operator is checking for High Availability instead of seeing if there is an issue with Class Loading in the Job Manager.

      Example:
      When deploying a SQL Flink Job, it starts the job managers in a failed state.
      Describing the flink deployment returns the Error message 

      RestoreFailed ... HA metadata not available to restore from last state. It is possible that the job has finished or terminally failed, or the configmaps have been deleted.

      But upon further investigation, the actual error was that the class loading of the Job Manager wasn't correct. This was a log in the Job Manager

      "Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.\n\nAvailable factory identifiers are:\n\nblackhole\ndatagen\nfilesystem\nprint","name":"org.apache.flink.table.api.ValidationException","extendedStackTrace":"org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.\n\nAvailable factory identifiers are:\n\nblackhole\ndatagen\nfilesystem\nprint\n\"

      There is also logging in the operator

      ... Cannot discover a connector using option: 'connector'='kafka'\n\tat org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:798)\n\tat org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:772)\n\tat org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:215)\n\t... 52 more\nCaused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath ....

      I think that the operator should return this error in the CRD since the HA error is not the root cause. 

       

       

      To recreate:

      All I did was remove the `"org.apache.flink:flink-connector-kafka:$flinkConnectorKafkaVersion"` from my bundled jar so the class path was missing. This was executing a Flink SQL job. Which means the job manager starts before the class path error is thrown which seems to be the issue.

      Attachments

        Activity

          People

            Unassigned Unassigned
            ryanvanhuuksloot Ryan van Huuksloot
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: