Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6655

Misleading error message when HistoryServer path is empty

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.4.0, 1.3.2
    • Component/s: History Server
    • Labels:
      None

      Description

      If the HistoryServer jobmanager.archive.fs.dir if e.g. file://. The following exception mentions checkpoints, which is misleading.

      java.lang.IllegalArgumentException: Cannot use the root directory for checkpoints.
      	at org.apache.flink.runtime.state.filesystem.FsStateBackend.validateAndNormalizeUri(FsStateBackend.java:358)
      	at org.apache.flink.runtime.jobmanager.MemoryArchivist.org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles(MemoryArchivist.scala:201)
      	at org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$handleMessage$1.applyOrElse(MemoryArchivist.scala:107)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
      	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
      	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
      	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
      	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
      	at org.apache.flink.runtime.jobmanager.MemoryArchivist.aroundReceive(MemoryArchivist.scala:65)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      

        Issue Links

          Activity

          Hide
          mingleizhang mingleizhang added a comment -

          Timo Walther It would be better if you can post more stacktrace messages or other information. e.g. stacktrace refers to HistoryServer message. Just look at the following code. I dont think it is suitable fix code in FsStateBackend class. So, I need more stracetrace messages if you can support.

          if (path.length() == 0 || path.equals("/")) {
          	throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
          }
          
          Show
          mingleizhang mingleizhang added a comment - Timo Walther It would be better if you can post more stacktrace messages or other information. e.g. stacktrace refers to HistoryServer message. Just look at the following code. I dont think it is suitable fix code in FsStateBackend class. So, I need more stracetrace messages if you can support. if (path.length() == 0 || path.equals( "/" )) { throw new IllegalArgumentException( "Cannot use the root directory for checkpoints." ); }
          Hide
          Zentol Chesnay Schepler added a comment -

          The stack trace is sufficient. The basic problem is that HistoryServer code (which the MemoryArchivist partly belongs to) re-uses some utility methods from the FsStateBackend for verifying the validity of paths. The problem is simply that the error messages were written with the FsStatebackend in mind, and aren't suited for general-purpose usages.

          As such we either have to replace the call (i.e by copying the method somewhere else) or modifying the method to provide more general error messages. I would probably prefer the first option, even if it results in code duplication, for the sake of keeping the more specific error messages for the FsStatebackend.

          Show
          Zentol Chesnay Schepler added a comment - The stack trace is sufficient. The basic problem is that HistoryServer code (which the MemoryArchivist partly belongs to) re-uses some utility methods from the FsStateBackend for verifying the validity of paths. The problem is simply that the error messages were written with the FsStatebackend in mind, and aren't suited for general-purpose usages. As such we either have to replace the call (i.e by copying the method somewhere else) or modifying the method to provide more general error messages. I would probably prefer the first option, even if it results in code duplication, for the sake of keeping the more specific error messages for the FsStatebackend.
          Hide
          mingleizhang mingleizhang added a comment -

          Exactly. That is why I said it is not suitable fix in FsStateBackend. I would either copy the the method to MemoryArchivist and convert it to scala code or create a new class including the method which under org.apache.flink.runtime.util package. I would prefer the first one.

          Show
          mingleizhang mingleizhang added a comment - Exactly. That is why I said it is not suitable fix in FsStateBackend. I would either copy the the method to MemoryArchivist and convert it to scala code or create a new class including the method which under org.apache.flink.runtime.util package. I would prefer the first one.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zhangminglei opened a pull request:

          https://github.com/apache/flink/pull/4156

          FLINK-6655 Add validateAndNormalizeUri method to MemoryArchivist

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zhangminglei/flink flink-6655

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4156.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4156


          commit 700453627f8d922b9733d14f4953c9827979d79b
          Author: zhangminglei <zml13856086071@163.com>
          Date: 2017-06-21T12:45:55Z

          FLINK-6655 Add validateAndNormalizeUri method to MemoryArchivist


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/4156 FLINK-6655 Add validateAndNormalizeUri method to MemoryArchivist Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-6655 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4156.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4156 commit 700453627f8d922b9733d14f4953c9827979d79b Author: zhangminglei <zml13856086071@163.com> Date: 2017-06-21T12:45:55Z FLINK-6655 Add validateAndNormalizeUri method to MemoryArchivist
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhangminglei commented on the issue:

          https://github.com/apache/flink/pull/4156

          cc @twalthr @zentol Please helps to review, Thank you very much.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4156 cc @twalthr @zentol Please helps to review, Thank you very much.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4156#discussion_r123980821

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala —
          @@ -255,4 +255,75 @@ class MemoryArchivist(
          graphs.remove(jobID)
          }
          }
          +
          + /**
          + * Checks and normalizes the archive path URI. This method first checks the validity of the
          + * URI (scheme, path, availability of a matching file system) and then normalizes the URL
          + * to a path.
          + *
          + * If the URI does not include an authority, but the file system configured for the URI has an
          + * authority, then the normalized path will include this authority.
          + *
          + * @param archivePathUri The URI to check and normalize.
          + * @return a normalized URI as a Path.
          + *
          + * @throws IllegalArgumentException Thrown, if the URI misses schema or path.
          + * @throws IOException Thrown, if no file system can be found for the URI's scheme.
          + */
          + @throws[IOException]
          + private def validateAndNormalizeUri(archivePathUri: URI): Path = {
          + val scheme = archivePathUri.getScheme
          + val path = archivePathUri.getPath
          +
          + // some validity checks
          + if (scheme == null)

          { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + }

          +
          + if (path == null) {
          + throw new IllegalArgumentException("The path to store the archive job is null. " +
          + "Please specify a directory path for archive.")
          — End diff –

          -> "for storing job archives."

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4156#discussion_r123980821 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala — @@ -255,4 +255,75 @@ class MemoryArchivist( graphs.remove(jobID) } } + + /** + * Checks and normalizes the archive path URI. This method first checks the validity of the + * URI (scheme, path, availability of a matching file system) and then normalizes the URL + * to a path. + * + * If the URI does not include an authority, but the file system configured for the URI has an + * authority, then the normalized path will include this authority. + * + * @param archivePathUri The URI to check and normalize. + * @return a normalized URI as a Path. + * + * @throws IllegalArgumentException Thrown, if the URI misses schema or path. + * @throws IOException Thrown, if no file system can be found for the URI's scheme. + */ + @throws [IOException] + private def validateAndNormalizeUri(archivePathUri: URI): Path = { + val scheme = archivePathUri.getScheme + val path = archivePathUri.getPath + + // some validity checks + if (scheme == null) { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + } + + if (path == null) { + throw new IllegalArgumentException("The path to store the archive job is null. " + + "Please specify a directory path for archive.") — End diff – -> "for storing job archives."
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4156#discussion_r123980754

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala —
          @@ -255,4 +255,75 @@ class MemoryArchivist(
          graphs.remove(jobID)
          }
          }
          +
          + /**
          + * Checks and normalizes the archive path URI. This method first checks the validity of the
          + * URI (scheme, path, availability of a matching file system) and then normalizes the URL
          + * to a path.
          + *
          + * If the URI does not include an authority, but the file system configured for the URI has an
          + * authority, then the normalized path will include this authority.
          + *
          + * @param archivePathUri The URI to check and normalize.
          + * @return a normalized URI as a Path.
          + *
          + * @throws IllegalArgumentException Thrown, if the URI misses schema or path.
          + * @throws IOException Thrown, if no file system can be found for the URI's scheme.
          + */
          + @throws[IOException]
          + private def validateAndNormalizeUri(archivePathUri: URI): Path = {
          + val scheme = archivePathUri.getScheme
          + val path = archivePathUri.getPath
          +
          + // some validity checks
          + if (scheme == null)

          { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + }

          +
          + if (path == null) {
          + throw new IllegalArgumentException("The path to store the archive job is null. " +
          — End diff –

          again, config option key. Also, it should say "to store the job archives is null,"

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4156#discussion_r123980754 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala — @@ -255,4 +255,75 @@ class MemoryArchivist( graphs.remove(jobID) } } + + /** + * Checks and normalizes the archive path URI. This method first checks the validity of the + * URI (scheme, path, availability of a matching file system) and then normalizes the URL + * to a path. + * + * If the URI does not include an authority, but the file system configured for the URI has an + * authority, then the normalized path will include this authority. + * + * @param archivePathUri The URI to check and normalize. + * @return a normalized URI as a Path. + * + * @throws IllegalArgumentException Thrown, if the URI misses schema or path. + * @throws IOException Thrown, if no file system can be found for the URI's scheme. + */ + @throws [IOException] + private def validateAndNormalizeUri(archivePathUri: URI): Path = { + val scheme = archivePathUri.getScheme + val path = archivePathUri.getPath + + // some validity checks + if (scheme == null) { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + } + + if (path == null) { + throw new IllegalArgumentException("The path to store the archive job is null. " + — End diff – again, config option key. Also, it should say "to store the job archives is null,"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4156#discussion_r123980868

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala —
          @@ -255,4 +255,75 @@ class MemoryArchivist(
          graphs.remove(jobID)
          }
          }
          +
          + /**
          + * Checks and normalizes the archive path URI. This method first checks the validity of the
          + * URI (scheme, path, availability of a matching file system) and then normalizes the URL
          + * to a path.
          + *
          + * If the URI does not include an authority, but the file system configured for the URI has an
          + * authority, then the normalized path will include this authority.
          + *
          + * @param archivePathUri The URI to check and normalize.
          + * @return a normalized URI as a Path.
          + *
          + * @throws IllegalArgumentException Thrown, if the URI misses schema or path.
          + * @throws IOException Thrown, if no file system can be found for the URI's scheme.
          + */
          + @throws[IOException]
          + private def validateAndNormalizeUri(archivePathUri: URI): Path = {
          + val scheme = archivePathUri.getScheme
          + val path = archivePathUri.getPath
          +
          + // some validity checks
          + if (scheme == null)

          { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + }

          +
          + if (path == null)

          { + throw new IllegalArgumentException("The path to store the archive job is null. " + + "Please specify a directory path for archive.") + }

          +
          + if (path.length == 0 || path == "/") {
          + throw new IllegalArgumentException("Cannot use the root directory for archive.")
          — End diff –

          -> "for storing job archives."

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4156#discussion_r123980868 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala — @@ -255,4 +255,75 @@ class MemoryArchivist( graphs.remove(jobID) } } + + /** + * Checks and normalizes the archive path URI. This method first checks the validity of the + * URI (scheme, path, availability of a matching file system) and then normalizes the URL + * to a path. + * + * If the URI does not include an authority, but the file system configured for the URI has an + * authority, then the normalized path will include this authority. + * + * @param archivePathUri The URI to check and normalize. + * @return a normalized URI as a Path. + * + * @throws IllegalArgumentException Thrown, if the URI misses schema or path. + * @throws IOException Thrown, if no file system can be found for the URI's scheme. + */ + @throws [IOException] + private def validateAndNormalizeUri(archivePathUri: URI): Path = { + val scheme = archivePathUri.getScheme + val path = archivePathUri.getPath + + // some validity checks + if (scheme == null) { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + } + + if (path == null) { + throw new IllegalArgumentException("The path to store the archive job is null. " + + "Please specify a directory path for archive.") + } + + if (path.length == 0 || path == "/") { + throw new IllegalArgumentException("Cannot use the root directory for archive.") — End diff – -> "for storing job archives."
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4156#discussion_r123980977

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala —
          @@ -255,4 +255,75 @@ class MemoryArchivist(
          graphs.remove(jobID)
          }
          }
          +
          + /**
          + * Checks and normalizes the archive path URI. This method first checks the validity of the
          + * URI (scheme, path, availability of a matching file system) and then normalizes the URL
          + * to a path.
          + *
          + * If the URI does not include an authority, but the file system configured for the URI has an
          + * authority, then the normalized path will include this authority.
          + *
          + * @param archivePathUri The URI to check and normalize.
          + * @return a normalized URI as a Path.
          + *
          + * @throws IllegalArgumentException Thrown, if the URI misses schema or path.
          + * @throws IOException Thrown, if no file system can be found for the URI's scheme.
          + */
          + @throws[IOException]
          + private def validateAndNormalizeUri(archivePathUri: URI): Path = {
          + val scheme = archivePathUri.getScheme
          + val path = archivePathUri.getPath
          +
          + // some validity checks
          + if (scheme == null)

          { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + }

          +
          + if (path == null)

          { + throw new IllegalArgumentException("The path to store the archive job is null. " + + "Please specify a directory path for archive.") + }

          +
          + if (path.length == 0 || path == "/")

          { + throw new IllegalArgumentException("Cannot use the root directory for archive.") + }

          + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme))

          { + // skip verification checks for non-flink supported filesystem + // this is because the required filesystem classes may not be available to the flink client + new Path(archivePathUri) + }

          + else {
          — End diff –

          we don't need this branch, as we only access the path from the jobmanager.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4156#discussion_r123980977 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala — @@ -255,4 +255,75 @@ class MemoryArchivist( graphs.remove(jobID) } } + + /** + * Checks and normalizes the archive path URI. This method first checks the validity of the + * URI (scheme, path, availability of a matching file system) and then normalizes the URL + * to a path. + * + * If the URI does not include an authority, but the file system configured for the URI has an + * authority, then the normalized path will include this authority. + * + * @param archivePathUri The URI to check and normalize. + * @return a normalized URI as a Path. + * + * @throws IllegalArgumentException Thrown, if the URI misses schema or path. + * @throws IOException Thrown, if no file system can be found for the URI's scheme. + */ + @throws [IOException] + private def validateAndNormalizeUri(archivePathUri: URI): Path = { + val scheme = archivePathUri.getScheme + val path = archivePathUri.getPath + + // some validity checks + if (scheme == null) { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + } + + if (path == null) { + throw new IllegalArgumentException("The path to store the archive job is null. " + + "Please specify a directory path for archive.") + } + + if (path.length == 0 || path == "/") { + throw new IllegalArgumentException("Cannot use the root directory for archive.") + } + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) { + // skip verification checks for non-flink supported filesystem + // this is because the required filesystem classes may not be available to the flink client + new Path(archivePathUri) + } + else { — End diff – we don't need this branch, as we only access the path from the jobmanager.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4156#discussion_r123980694

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala —
          @@ -255,4 +255,75 @@ class MemoryArchivist(
          graphs.remove(jobID)
          }
          }
          +
          + /**
          + * Checks and normalizes the archive path URI. This method first checks the validity of the
          + * URI (scheme, path, availability of a matching file system) and then normalizes the URL
          + * to a path.
          + *
          + * If the URI does not include an authority, but the file system configured for the URI has an
          + * authority, then the normalized path will include this authority.
          + *
          + * @param archivePathUri The URI to check and normalize.
          + * @return a normalized URI as a Path.
          + *
          + * @throws IllegalArgumentException Thrown, if the URI misses schema or path.
          + * @throws IOException Thrown, if no file system can be found for the URI's scheme.
          + */
          + @throws[IOException]
          + private def validateAndNormalizeUri(archivePathUri: URI): Path = {
          + val scheme = archivePathUri.getScheme
          + val path = archivePathUri.getPath
          +
          + // some validity checks
          + if (scheme == null) {
          + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
          — End diff –

          Let's include the config option key in the error message.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4156#discussion_r123980694 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala — @@ -255,4 +255,75 @@ class MemoryArchivist( graphs.remove(jobID) } } + + /** + * Checks and normalizes the archive path URI. This method first checks the validity of the + * URI (scheme, path, availability of a matching file system) and then normalizes the URL + * to a path. + * + * If the URI does not include an authority, but the file system configured for the URI has an + * authority, then the normalized path will include this authority. + * + * @param archivePathUri The URI to check and normalize. + * @return a normalized URI as a Path. + * + * @throws IllegalArgumentException Thrown, if the URI misses schema or path. + * @throws IOException Thrown, if no file system can be found for the URI's scheme. + */ + @throws [IOException] + private def validateAndNormalizeUri(archivePathUri: URI): Path = { + val scheme = archivePathUri.getScheme + val path = archivePathUri.getPath + + // some validity checks + if (scheme == null) { + throw new IllegalArgumentException("The scheme (hdfs://, file:// , etc) is null. " + — End diff – Let's include the config option key in the error message.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhangminglei commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4156#discussion_r124221227

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala —
          @@ -255,4 +255,75 @@ class MemoryArchivist(
          graphs.remove(jobID)
          }
          }
          +
          + /**
          + * Checks and normalizes the archive path URI. This method first checks the validity of the
          + * URI (scheme, path, availability of a matching file system) and then normalizes the URL
          + * to a path.
          + *
          + * If the URI does not include an authority, but the file system configured for the URI has an
          + * authority, then the normalized path will include this authority.
          + *
          + * @param archivePathUri The URI to check and normalize.
          + * @return a normalized URI as a Path.
          + *
          + * @throws IllegalArgumentException Thrown, if the URI misses schema or path.
          + * @throws IOException Thrown, if no file system can be found for the URI's scheme.
          + */
          + @throws[IOException]
          + private def validateAndNormalizeUri(archivePathUri: URI): Path = {
          + val scheme = archivePathUri.getScheme
          + val path = archivePathUri.getPath
          +
          + // some validity checks
          + if (scheme == null)

          { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + }

          +
          + if (path == null)

          { + throw new IllegalArgumentException("The path to store the archive job is null. " + + "Please specify a directory path for archive.") + }

          +
          + if (path.length == 0 || path == "/")

          { + throw new IllegalArgumentException("Cannot use the root directory for archive.") + }

          + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme))

          { + // skip verification checks for non-flink supported filesystem + // this is because the required filesystem classes may not be available to the flink client + new Path(archivePathUri) + }

          + else {
          — End diff –

          Thanks, That's true. Only FsStateBackend needs this branch access the path from the file system.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/4156#discussion_r124221227 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala — @@ -255,4 +255,75 @@ class MemoryArchivist( graphs.remove(jobID) } } + + /** + * Checks and normalizes the archive path URI. This method first checks the validity of the + * URI (scheme, path, availability of a matching file system) and then normalizes the URL + * to a path. + * + * If the URI does not include an authority, but the file system configured for the URI has an + * authority, then the normalized path will include this authority. + * + * @param archivePathUri The URI to check and normalize. + * @return a normalized URI as a Path. + * + * @throws IllegalArgumentException Thrown, if the URI misses schema or path. + * @throws IOException Thrown, if no file system can be found for the URI's scheme. + */ + @throws [IOException] + private def validateAndNormalizeUri(archivePathUri: URI): Path = { + val scheme = archivePathUri.getScheme + val path = archivePathUri.getPath + + // some validity checks + if (scheme == null) { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + } + + if (path == null) { + throw new IllegalArgumentException("The path to store the archive job is null. " + + "Please specify a directory path for archive.") + } + + if (path.length == 0 || path == "/") { + throw new IllegalArgumentException("Cannot use the root directory for archive.") + } + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) { + // skip verification checks for non-flink supported filesystem + // this is because the required filesystem classes may not be available to the flink client + new Path(archivePathUri) + } + else { — End diff – Thanks, That's true. Only FsStateBackend needs this branch access the path from the file system.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhangminglei commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4156#discussion_r124224512

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala —
          @@ -255,4 +255,75 @@ class MemoryArchivist(
          graphs.remove(jobID)
          }
          }
          +
          + /**
          + * Checks and normalizes the archive path URI. This method first checks the validity of the
          + * URI (scheme, path, availability of a matching file system) and then normalizes the URL
          + * to a path.
          + *
          + * If the URI does not include an authority, but the file system configured for the URI has an
          + * authority, then the normalized path will include this authority.
          + *
          + * @param archivePathUri The URI to check and normalize.
          + * @return a normalized URI as a Path.
          + *
          + * @throws IllegalArgumentException Thrown, if the URI misses schema or path.
          + * @throws IOException Thrown, if no file system can be found for the URI's scheme.
          + */
          + @throws[IOException]
          + private def validateAndNormalizeUri(archivePathUri: URI): Path = {
          + val scheme = archivePathUri.getScheme
          + val path = archivePathUri.getPath
          +
          + // some validity checks
          + if (scheme == null)

          { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + }

          +
          + if (path == null)

          { + throw new IllegalArgumentException("The path to store the archive job is null. " + + "Please specify a directory path for archive.") + }

          +
          + if (path.length == 0 || path == "/")

          { + throw new IllegalArgumentException("Cannot use the root directory for archive.") + }

          + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme))

          { + // skip verification checks for non-flink supported filesystem + // this is because the required filesystem classes may not be available to the flink client + new Path(archivePathUri) + }

          + else {
          — End diff –

          I think if and else branch both can delete. If I am wrong, please let me know.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/4156#discussion_r124224512 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala — @@ -255,4 +255,75 @@ class MemoryArchivist( graphs.remove(jobID) } } + + /** + * Checks and normalizes the archive path URI. This method first checks the validity of the + * URI (scheme, path, availability of a matching file system) and then normalizes the URL + * to a path. + * + * If the URI does not include an authority, but the file system configured for the URI has an + * authority, then the normalized path will include this authority. + * + * @param archivePathUri The URI to check and normalize. + * @return a normalized URI as a Path. + * + * @throws IllegalArgumentException Thrown, if the URI misses schema or path. + * @throws IOException Thrown, if no file system can be found for the URI's scheme. + */ + @throws [IOException] + private def validateAndNormalizeUri(archivePathUri: URI): Path = { + val scheme = archivePathUri.getScheme + val path = archivePathUri.getPath + + // some validity checks + if (scheme == null) { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + } + + if (path == null) { + throw new IllegalArgumentException("The path to store the archive job is null. " + + "Please specify a directory path for archive.") + } + + if (path.length == 0 || path == "/") { + throw new IllegalArgumentException("Cannot use the root directory for archive.") + } + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) { + // skip verification checks for non-flink supported filesystem + // this is because the required filesystem classes may not be available to the flink client + new Path(archivePathUri) + } + else { — End diff – I think if and else branch both can delete. If I am wrong, please let me know.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4156#discussion_r124229611

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala —
          @@ -255,4 +255,75 @@ class MemoryArchivist(
          graphs.remove(jobID)
          }
          }
          +
          + /**
          + * Checks and normalizes the archive path URI. This method first checks the validity of the
          + * URI (scheme, path, availability of a matching file system) and then normalizes the URL
          + * to a path.
          + *
          + * If the URI does not include an authority, but the file system configured for the URI has an
          + * authority, then the normalized path will include this authority.
          + *
          + * @param archivePathUri The URI to check and normalize.
          + * @return a normalized URI as a Path.
          + *
          + * @throws IllegalArgumentException Thrown, if the URI misses schema or path.
          + * @throws IOException Thrown, if no file system can be found for the URI's scheme.
          + */
          + @throws[IOException]
          + private def validateAndNormalizeUri(archivePathUri: URI): Path = {
          + val scheme = archivePathUri.getScheme
          + val path = archivePathUri.getPath
          +
          + // some validity checks
          + if (scheme == null)

          { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + }

          +
          + if (path == null)

          { + throw new IllegalArgumentException("The path to store the archive job is null. " + + "Please specify a directory path for archive.") + }

          +
          + if (path.length == 0 || path == "/")

          { + throw new IllegalArgumentException("Cannot use the root directory for archive.") + }

          + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme))

          { + // skip verification checks for non-flink supported filesystem + // this is because the required filesystem classes may not be available to the flink client + new Path(archivePathUri) + }

          + else {
          — End diff –

          It should throw an exception if the scheme isn't supported.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4156#discussion_r124229611 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala — @@ -255,4 +255,75 @@ class MemoryArchivist( graphs.remove(jobID) } } + + /** + * Checks and normalizes the archive path URI. This method first checks the validity of the + * URI (scheme, path, availability of a matching file system) and then normalizes the URL + * to a path. + * + * If the URI does not include an authority, but the file system configured for the URI has an + * authority, then the normalized path will include this authority. + * + * @param archivePathUri The URI to check and normalize. + * @return a normalized URI as a Path. + * + * @throws IllegalArgumentException Thrown, if the URI misses schema or path. + * @throws IOException Thrown, if no file system can be found for the URI's scheme. + */ + @throws [IOException] + private def validateAndNormalizeUri(archivePathUri: URI): Path = { + val scheme = archivePathUri.getScheme + val path = archivePathUri.getPath + + // some validity checks + if (scheme == null) { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + } + + if (path == null) { + throw new IllegalArgumentException("The path to store the archive job is null. " + + "Please specify a directory path for archive.") + } + + if (path.length == 0 || path == "/") { + throw new IllegalArgumentException("Cannot use the root directory for archive.") + } + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) { + // skip verification checks for non-flink supported filesystem + // this is because the required filesystem classes may not be available to the flink client + new Path(archivePathUri) + } + else { — End diff – It should throw an exception if the scheme isn't supported.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhangminglei commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4156#discussion_r124251147

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala —
          @@ -255,4 +255,75 @@ class MemoryArchivist(
          graphs.remove(jobID)
          }
          }
          +
          + /**
          + * Checks and normalizes the archive path URI. This method first checks the validity of the
          + * URI (scheme, path, availability of a matching file system) and then normalizes the URL
          + * to a path.
          + *
          + * If the URI does not include an authority, but the file system configured for the URI has an
          + * authority, then the normalized path will include this authority.
          + *
          + * @param archivePathUri The URI to check and normalize.
          + * @return a normalized URI as a Path.
          + *
          + * @throws IllegalArgumentException Thrown, if the URI misses schema or path.
          + * @throws IOException Thrown, if no file system can be found for the URI's scheme.
          + */
          + @throws[IOException]
          + private def validateAndNormalizeUri(archivePathUri: URI): Path = {
          + val scheme = archivePathUri.getScheme
          + val path = archivePathUri.getPath
          +
          + // some validity checks
          + if (scheme == null)

          { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + }

          +
          + if (path == null)

          { + throw new IllegalArgumentException("The path to store the archive job is null. " + + "Please specify a directory path for archive.") + }

          +
          + if (path.length == 0 || path == "/")

          { + throw new IllegalArgumentException("Cannot use the root directory for archive.") + }

          + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme))

          { + // skip verification checks for non-flink supported filesystem + // this is because the required filesystem classes may not be available to the flink client + new Path(archivePathUri) + }

          + else {
          — End diff –

          I am not very sure what kind of scheme that flink support. For example, I just know existing scheme from what those codes say ```hdfs``` or ```file```. If I know what kinda scheme that flink support, I probably fix the code like ```if (scheme.startsWith("hdfs") or scheme.startsWith("file"))``` or something like that. Please helps check the newest code again and give some advices. :XD

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/4156#discussion_r124251147 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala — @@ -255,4 +255,75 @@ class MemoryArchivist( graphs.remove(jobID) } } + + /** + * Checks and normalizes the archive path URI. This method first checks the validity of the + * URI (scheme, path, availability of a matching file system) and then normalizes the URL + * to a path. + * + * If the URI does not include an authority, but the file system configured for the URI has an + * authority, then the normalized path will include this authority. + * + * @param archivePathUri The URI to check and normalize. + * @return a normalized URI as a Path. + * + * @throws IllegalArgumentException Thrown, if the URI misses schema or path. + * @throws IOException Thrown, if no file system can be found for the URI's scheme. + */ + @throws [IOException] + private def validateAndNormalizeUri(archivePathUri: URI): Path = { + val scheme = archivePathUri.getScheme + val path = archivePathUri.getPath + + // some validity checks + if (scheme == null) { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + } + + if (path == null) { + throw new IllegalArgumentException("The path to store the archive job is null. " + + "Please specify a directory path for archive.") + } + + if (path.length == 0 || path == "/") { + throw new IllegalArgumentException("Cannot use the root directory for archive.") + } + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) { + // skip verification checks for non-flink supported filesystem + // this is because the required filesystem classes may not be available to the flink client + new Path(archivePathUri) + } + else { — End diff – I am not very sure what kind of scheme that flink support. For example, I just know existing scheme from what those codes say ```hdfs``` or ```file```. If I know what kinda scheme that flink support, I probably fix the code like ```if (scheme.startsWith("hdfs") or scheme.startsWith("file"))``` or something like that. Please helps check the newest code again and give some advices. :XD
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4156#discussion_r124253131

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala —
          @@ -255,4 +255,75 @@ class MemoryArchivist(
          graphs.remove(jobID)
          }
          }
          +
          + /**
          + * Checks and normalizes the archive path URI. This method first checks the validity of the
          + * URI (scheme, path, availability of a matching file system) and then normalizes the URL
          + * to a path.
          + *
          + * If the URI does not include an authority, but the file system configured for the URI has an
          + * authority, then the normalized path will include this authority.
          + *
          + * @param archivePathUri The URI to check and normalize.
          + * @return a normalized URI as a Path.
          + *
          + * @throws IllegalArgumentException Thrown, if the URI misses schema or path.
          + * @throws IOException Thrown, if no file system can be found for the URI's scheme.
          + */
          + @throws[IOException]
          + private def validateAndNormalizeUri(archivePathUri: URI): Path = {
          + val scheme = archivePathUri.getScheme
          + val path = archivePathUri.getPath
          +
          + // some validity checks
          + if (scheme == null)

          { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + }

          +
          + if (path == null)

          { + throw new IllegalArgumentException("The path to store the archive job is null. " + + "Please specify a directory path for archive.") + }

          +
          + if (path.length == 0 || path == "/")

          { + throw new IllegalArgumentException("Cannot use the root directory for archive.") + }

          + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme))

          { + // skip verification checks for non-flink supported filesystem + // this is because the required filesystem classes may not be available to the flink client + new Path(archivePathUri) + }

          + else {
          — End diff –

          you also just use `FileSystem.isFlinkSupportedScheme` which was used in the original method,

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4156#discussion_r124253131 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala — @@ -255,4 +255,75 @@ class MemoryArchivist( graphs.remove(jobID) } } + + /** + * Checks and normalizes the archive path URI. This method first checks the validity of the + * URI (scheme, path, availability of a matching file system) and then normalizes the URL + * to a path. + * + * If the URI does not include an authority, but the file system configured for the URI has an + * authority, then the normalized path will include this authority. + * + * @param archivePathUri The URI to check and normalize. + * @return a normalized URI as a Path. + * + * @throws IllegalArgumentException Thrown, if the URI misses schema or path. + * @throws IOException Thrown, if no file system can be found for the URI's scheme. + */ + @throws [IOException] + private def validateAndNormalizeUri(archivePathUri: URI): Path = { + val scheme = archivePathUri.getScheme + val path = archivePathUri.getPath + + // some validity checks + if (scheme == null) { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + } + + if (path == null) { + throw new IllegalArgumentException("The path to store the archive job is null. " + + "Please specify a directory path for archive.") + } + + if (path.length == 0 || path == "/") { + throw new IllegalArgumentException("Cannot use the root directory for archive.") + } + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) { + // skip verification checks for non-flink supported filesystem + // this is because the required filesystem classes may not be available to the flink client + new Path(archivePathUri) + } + else { — End diff – you also just use `FileSystem.isFlinkSupportedScheme` which was used in the original method,
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhangminglei commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4156#discussion_r124259876

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala —
          @@ -255,4 +255,75 @@ class MemoryArchivist(
          graphs.remove(jobID)
          }
          }
          +
          + /**
          + * Checks and normalizes the archive path URI. This method first checks the validity of the
          + * URI (scheme, path, availability of a matching file system) and then normalizes the URL
          + * to a path.
          + *
          + * If the URI does not include an authority, but the file system configured for the URI has an
          + * authority, then the normalized path will include this authority.
          + *
          + * @param archivePathUri The URI to check and normalize.
          + * @return a normalized URI as a Path.
          + *
          + * @throws IllegalArgumentException Thrown, if the URI misses schema or path.
          + * @throws IOException Thrown, if no file system can be found for the URI's scheme.
          + */
          + @throws[IOException]
          + private def validateAndNormalizeUri(archivePathUri: URI): Path = {
          + val scheme = archivePathUri.getScheme
          + val path = archivePathUri.getPath
          +
          + // some validity checks
          + if (scheme == null)

          { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + }

          +
          + if (path == null)

          { + throw new IllegalArgumentException("The path to store the archive job is null. " + + "Please specify a directory path for archive.") + }

          +
          + if (path.length == 0 || path == "/")

          { + throw new IllegalArgumentException("Cannot use the root directory for archive.") + }

          + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme))

          { + // skip verification checks for non-flink supported filesystem + // this is because the required filesystem classes may not be available to the flink client + new Path(archivePathUri) + }

          + else {
          — End diff –

          Ohhh, yeah. What a silly mistake I was making. I can get what kinda scheme from ```FileSystem.isFlinkSupportedScheme```. I have updated the code, please check again.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/4156#discussion_r124259876 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala — @@ -255,4 +255,75 @@ class MemoryArchivist( graphs.remove(jobID) } } + + /** + * Checks and normalizes the archive path URI. This method first checks the validity of the + * URI (scheme, path, availability of a matching file system) and then normalizes the URL + * to a path. + * + * If the URI does not include an authority, but the file system configured for the URI has an + * authority, then the normalized path will include this authority. + * + * @param archivePathUri The URI to check and normalize. + * @return a normalized URI as a Path. + * + * @throws IllegalArgumentException Thrown, if the URI misses schema or path. + * @throws IOException Thrown, if no file system can be found for the URI's scheme. + */ + @throws [IOException] + private def validateAndNormalizeUri(archivePathUri: URI): Path = { + val scheme = archivePathUri.getScheme + val path = archivePathUri.getPath + + // some validity checks + if (scheme == null) { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI.") + } + + if (path == null) { + throw new IllegalArgumentException("The path to store the archive job is null. " + + "Please specify a directory path for archive.") + } + + if (path.length == 0 || path == "/") { + throw new IllegalArgumentException("Cannot use the root directory for archive.") + } + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) { + // skip verification checks for non-flink supported filesystem + // this is because the required filesystem classes may not be available to the flink client + new Path(archivePathUri) + } + else { — End diff – Ohhh, yeah. What a silly mistake I was making. I can get what kinda scheme from ```FileSystem.isFlinkSupportedScheme```. I have updated the code, please check again.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4156#discussion_r124266875

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala —
          @@ -255,4 +255,47 @@ class MemoryArchivist(
          graphs.remove(jobID)
          }
          }
          +
          + /**
          + * Checks and normalizes the archive path URI. This method first checks the validity of the
          + * URI (scheme, path, availability of a matching file system) and then normalizes the URL
          + * to a path.
          + *
          + * If the URI does not include an authority, but the file system configured for the URI has an
          + * authority, then the normalized path will include this authority.
          + *
          + * @param archivePathUri The URI to check and normalize.
          + * @return a normalized URI as a Path.
          + *
          + * @throws IllegalArgumentException Thrown, if the URI misses schema or path.
          + * @throws IOException Thrown, if no file system can be found for the URI's scheme.
          + */
          + @throws[IOException]
          + private def validateAndNormalizeUri(archivePathUri: URI): Path = {
          + val scheme = archivePathUri.getScheme
          + val path = archivePathUri.getPath
          +
          + // some validity checks
          + if (scheme == null)

          { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI: " + archivePathUri) + }

          +
          + if (path == null)

          { + throw new IllegalArgumentException("The path to store the job archives is null. " + + "Please specify a directory path for storing job archives. and the URI is: " + archivePathUri) + }

          +
          + if (path.length == 0 || path == "/")

          { + throw new IllegalArgumentException("Cannot use the root directory for storing job archives.") + }

          +
          + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) {
          + // skip verification checks for non-flink supported filesystem
          + // this is because the required filesystem classes may not be available to the flink client
          + throw new IllegalArgumentException("Cannot use the " + archivePathUri.getScheme + " scheme, only hdfs, " +
          — End diff –

          This is bound to be outdated at some point, so tet's re-use an exception from the FileSystem class:
          ```
          "No file system found with scheme " + scheme + ", referenced in file URI '" + archivePathUri.toString() + "'."
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4156#discussion_r124266875 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala — @@ -255,4 +255,47 @@ class MemoryArchivist( graphs.remove(jobID) } } + + /** + * Checks and normalizes the archive path URI. This method first checks the validity of the + * URI (scheme, path, availability of a matching file system) and then normalizes the URL + * to a path. + * + * If the URI does not include an authority, but the file system configured for the URI has an + * authority, then the normalized path will include this authority. + * + * @param archivePathUri The URI to check and normalize. + * @return a normalized URI as a Path. + * + * @throws IllegalArgumentException Thrown, if the URI misses schema or path. + * @throws IOException Thrown, if no file system can be found for the URI's scheme. + */ + @throws [IOException] + private def validateAndNormalizeUri(archivePathUri: URI): Path = { + val scheme = archivePathUri.getScheme + val path = archivePathUri.getPath + + // some validity checks + if (scheme == null) { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI: " + archivePathUri) + } + + if (path == null) { + throw new IllegalArgumentException("The path to store the job archives is null. " + + "Please specify a directory path for storing job archives. and the URI is: " + archivePathUri) + } + + if (path.length == 0 || path == "/") { + throw new IllegalArgumentException("Cannot use the root directory for storing job archives.") + } + + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) { + // skip verification checks for non-flink supported filesystem + // this is because the required filesystem classes may not be available to the flink client + throw new IllegalArgumentException("Cannot use the " + archivePathUri.getScheme + " scheme, only hdfs, " + — End diff – This is bound to be outdated at some point, so tet's re-use an exception from the FileSystem class: ``` "No file system found with scheme " + scheme + ", referenced in file URI '" + archivePathUri.toString() + "'." ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhangminglei commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4156#discussion_r124272868

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala —
          @@ -255,4 +255,47 @@ class MemoryArchivist(
          graphs.remove(jobID)
          }
          }
          +
          + /**
          + * Checks and normalizes the archive path URI. This method first checks the validity of the
          + * URI (scheme, path, availability of a matching file system) and then normalizes the URL
          + * to a path.
          + *
          + * If the URI does not include an authority, but the file system configured for the URI has an
          + * authority, then the normalized path will include this authority.
          + *
          + * @param archivePathUri The URI to check and normalize.
          + * @return a normalized URI as a Path.
          + *
          + * @throws IllegalArgumentException Thrown, if the URI misses schema or path.
          + * @throws IOException Thrown, if no file system can be found for the URI's scheme.
          + */
          + @throws[IOException]
          + private def validateAndNormalizeUri(archivePathUri: URI): Path = {
          + val scheme = archivePathUri.getScheme
          + val path = archivePathUri.getPath
          +
          + // some validity checks
          + if (scheme == null)

          { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI: " + archivePathUri) + }

          +
          + if (path == null)

          { + throw new IllegalArgumentException("The path to store the job archives is null. " + + "Please specify a directory path for storing job archives. and the URI is: " + archivePathUri) + }

          +
          + if (path.length == 0 || path == "/")

          { + throw new IllegalArgumentException("Cannot use the root directory for storing job archives.") + }

          +
          + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) {
          + // skip verification checks for non-flink supported filesystem
          + // this is because the required filesystem classes may not be available to the flink client
          + throw new IllegalArgumentException("Cannot use the " + archivePathUri.getScheme + " scheme, only hdfs, " +
          — End diff –

          I have updated the code. Thanks @zentol VERY careful review. I am very appreciate it!

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/4156#discussion_r124272868 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala — @@ -255,4 +255,47 @@ class MemoryArchivist( graphs.remove(jobID) } } + + /** + * Checks and normalizes the archive path URI. This method first checks the validity of the + * URI (scheme, path, availability of a matching file system) and then normalizes the URL + * to a path. + * + * If the URI does not include an authority, but the file system configured for the URI has an + * authority, then the normalized path will include this authority. + * + * @param archivePathUri The URI to check and normalize. + * @return a normalized URI as a Path. + * + * @throws IllegalArgumentException Thrown, if the URI misses schema or path. + * @throws IOException Thrown, if no file system can be found for the URI's scheme. + */ + @throws [IOException] + private def validateAndNormalizeUri(archivePathUri: URI): Path = { + val scheme = archivePathUri.getScheme + val path = archivePathUri.getPath + + // some validity checks + if (scheme == null) { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI: " + archivePathUri) + } + + if (path == null) { + throw new IllegalArgumentException("The path to store the job archives is null. " + + "Please specify a directory path for storing job archives. and the URI is: " + archivePathUri) + } + + if (path.length == 0 || path == "/") { + throw new IllegalArgumentException("Cannot use the root directory for storing job archives.") + } + + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) { + // skip verification checks for non-flink supported filesystem + // this is because the required filesystem classes may not be available to the flink client + throw new IllegalArgumentException("Cannot use the " + archivePathUri.getScheme + " scheme, only hdfs, " + — End diff – I have updated the code. Thanks @zentol VERY careful review. I am very appreciate it!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/4156

          will try this out and merge it afterwards.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/4156 will try this out and merge it afterwards.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhangminglei commented on the issue:

          https://github.com/apache/flink/pull/4156

          Nice ~

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4156 Nice ~
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhangminglei commented on the issue:

          https://github.com/apache/flink/pull/4156

          @zentol If you dont mind, you can give me your sample test code via my personal email ```18717838093@163.com```, next time, I can test it by myself.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4156 @zentol If you dont mind, you can give me your sample test code via my personal email ```18717838093@163.com```, next time, I can test it by myself.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/4156

          For the local test i wouldn't use test code.

          build Flink -> mess with HistoryServer config -> start HistoryServer -> see what happens

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/4156 For the local test i wouldn't use test code. build Flink -> mess with HistoryServer config -> start HistoryServer -> see what happens
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhangminglei commented on the issue:

          https://github.com/apache/flink/pull/4156

          Okay, Let me try.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4156 Okay, Let me try.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhangminglei commented on the issue:

          https://github.com/apache/flink/pull/4156

          BTW, What is your Program arguments ? start with --configDir ...

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4156 BTW, What is your Program arguments ? start with --configDir ...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4156

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4156
          Hide
          Zentol Chesnay Schepler added a comment -

          1.3: d02e688e72c3d7a9a91a85f5816209a5d8d9aa60
          1.4: 39562691b160e3061794ac6605b5a9c1f031b548

          Show
          Zentol Chesnay Schepler added a comment - 1.3: d02e688e72c3d7a9a91a85f5816209a5d8d9aa60 1.4: 39562691b160e3061794ac6605b5a9c1f031b548

            People

            • Assignee:
              mingleizhang mingleizhang
              Reporter:
              twalthr Timo Walther
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development