Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5975

Mesos should support adding volumes to launched taskManagers

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 1.2.0, 1.3.0
    • Fix Version/s: 1.3.0
    • Component/s: Mesos
    • Labels:
      None

      Description

      Flink needs access to shared storage.

      In many cases, this is HDFS, but it would be nice to also support file URIs on an mounted NFS for example.

      Mesos exposes APIs for adding volumes, so it should be relatively simply to add this.

      As an example, here is the spark code for supporting volumes: https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala#L35

        Issue Links

          Activity

          Hide
          till.rohrmann Till Rohrmann added a comment -

          Added via 4ef9672a9da78bc4ce02e56d9ecdcc546da23e42

          Show
          till.rohrmann Till Rohrmann added a comment - Added via 4ef9672a9da78bc4ce02e56d9ecdcc546da23e42
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3481

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3481
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3481

          Changes look good to me. Will rebase the PR and then merge it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3481 Changes look good to me. Will rebase the PR and then merge it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user addisonj commented on the issue:

          https://github.com/apache/flink/pull/3481

          @tillrohrmann this was the PR I mentioned to you during flink foward, if you get a chance to look

          Show
          githubbot ASF GitHub Bot added a comment - Github user addisonj commented on the issue: https://github.com/apache/flink/pull/3481 @tillrohrmann this was the PR I mentioned to you during flink foward, if you get a chance to look
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user EronWright commented on the issue:

          https://github.com/apache/flink/pull/3481

          @zentol are you able to get this across the finish line?

          Show
          githubbot ASF GitHub Bot added a comment - Github user EronWright commented on the issue: https://github.com/apache/flink/pull/3481 @zentol are you able to get this across the finish line?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user addisonj commented on the issue:

          https://github.com/apache/flink/pull/3481

          @EronWright good suggestion in regards to containerInfo without an image name, I confirmed in the mesos docs that it should work that way.

          Lemme know if there is anything else!

          Show
          githubbot ASF GitHub Bot added a comment - Github user addisonj commented on the issue: https://github.com/apache/flink/pull/3481 @EronWright good suggestion in regards to containerInfo without an image name, I confirmed in the mesos docs that it should work that way. Lemme know if there is anything else!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user addisonj commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r108790880

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java —
          @@ -162,11 +182,65 @@ public static MesosTaskManagerParameters create(Configuration flinkConfig)

          { throw new IllegalConfigurationException("invalid container type: " + containerTypeString); }

          + Option<String> containerVolOpt = Option.<String>apply(flinkConfig.getString(MESOS_RM_CONTAINER_VOLUMES));
          + List<Protos.Volume> containerVolumes = buildVolumes(containerVolOpt);
          +
          return new MesosTaskManagerParameters(
          cpus,
          containerType,
          Option.apply(imageName),

          • containeredParameters);
            + containeredParameters,
            + containerVolumes);
            + }
            +
            + /**
            + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
            + *
            + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
            + * defines mount points for a container volume. If None or empty string, returns
            + * an empty iterator
            + */
            + public static List<Protos.Volume> buildVolumes(Option<String> containerVolumes) {
            + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + }

            + String[] specs = containerVolumes.get().split(",");
            + List<Protos.Volume> vols = new ArrayList<Protos.Volume>();
            + for (String s : specs) {
            + if (s.trim().isEmpty())

            { + continue; + }

            + Protos.Volume.Builder vol = Protos.Volume.newBuilder();
            + vol.setMode(Protos.Volume.Mode.RW);
            +
            + String[] parts = s.split(":");
            + switch (parts.length) {
            + case 1:
            + vol.setContainerPath(parts[0]);
            + break;
            + case 2:
            + try

            { + Protos.Volume.Mode mode = Protos.Volume.Mode.valueOf(parts[1].trim().toUpperCase()); + vol.setMode(mode) + .setContainerPath(parts[0]); + }

            catch (IllegalArgumentException e) {

              • End diff –

          totally agree, it is strange. But this is the same spec that docker CLI uses as well as the spark mesos framework... It doesn't seem ideal and is definitely somewhat of a sharp edge but seemed best to just use the same standard

          Show
          githubbot ASF GitHub Bot added a comment - Github user addisonj commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r108790880 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java — @@ -162,11 +182,65 @@ public static MesosTaskManagerParameters create(Configuration flinkConfig) { throw new IllegalConfigurationException("invalid container type: " + containerTypeString); } + Option<String> containerVolOpt = Option.<String>apply(flinkConfig.getString(MESOS_RM_CONTAINER_VOLUMES)); + List<Protos.Volume> containerVolumes = buildVolumes(containerVolOpt); + return new MesosTaskManagerParameters( cpus, containerType, Option.apply(imageName), containeredParameters); + containeredParameters, + containerVolumes); + } + + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public static List<Protos.Volume> buildVolumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (String s : specs) { + if (s.trim().isEmpty()) { + continue; + } + Protos.Volume.Builder vol = Protos.Volume.newBuilder(); + vol.setMode(Protos.Volume.Mode.RW); + + String[] parts = s.split(":"); + switch (parts.length) { + case 1: + vol.setContainerPath(parts [0] ); + break; + case 2: + try { + Protos.Volume.Mode mode = Protos.Volume.Mode.valueOf(parts[1].trim().toUpperCase()); + vol.setMode(mode) + .setContainerPath(parts[0]); + } catch (IllegalArgumentException e) { End diff – totally agree, it is strange. But this is the same spec that docker CLI uses as well as the spark mesos framework... It doesn't seem ideal and is definitely somewhat of a sharp edge but seemed best to just use the same standard
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user EronWright commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r105825035

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -259,9 +262,11 @@ public String toString()

          { throw new IllegalStateException("unsupported container type"); }

          if(containerInfo != null) {
          + containerInfo.addAllVolumes(params.containerVolumes());
          — End diff –

          There's an unnecessary restriction here, that volumes may be used only if a container image is also used. I bet you can use volumes with the Mesos containerizer without using an image. The code in the `case MESOS:` block should be reorganized to always set `containerInfo = Protos.ContainerInfo.newBuilder()`. This way, `containerInfo` will never be null.

          Show
          githubbot ASF GitHub Bot added a comment - Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r105825035 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -259,9 +262,11 @@ public String toString() { throw new IllegalStateException("unsupported container type"); } if(containerInfo != null) { + containerInfo.addAllVolumes(params.containerVolumes()); — End diff – There's an unnecessary restriction here, that volumes may be used only if a container image is also used. I bet you can use volumes with the Mesos containerizer without using an image. The code in the `case MESOS:` block should be reorganized to always set `containerInfo = Protos.ContainerInfo.newBuilder()`. This way, `containerInfo` will never be null.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user EronWright commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r105826378

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java —
          @@ -162,11 +182,65 @@ public static MesosTaskManagerParameters create(Configuration flinkConfig)

          { throw new IllegalConfigurationException("invalid container type: " + containerTypeString); }

          + Option<String> containerVolOpt = Option.<String>apply(flinkConfig.getString(MESOS_RM_CONTAINER_VOLUMES));
          + List<Protos.Volume> containerVolumes = buildVolumes(containerVolOpt);
          +
          return new MesosTaskManagerParameters(
          cpus,
          containerType,
          Option.apply(imageName),

          • containeredParameters);
            + containeredParameters,
            + containerVolumes);
            + }
            +
            + /**
            + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
            + *
            + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
            + * defines mount points for a container volume. If None or empty string, returns
            + * an empty iterator
            + */
            + public static List<Protos.Volume> buildVolumes(Option<String> containerVolumes) {
            + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + }

            + String[] specs = containerVolumes.get().split(",");
            + List<Protos.Volume> vols = new ArrayList<Protos.Volume>();
            + for (String s : specs) {
            + if (s.trim().isEmpty())

            { + continue; + }

            + Protos.Volume.Builder vol = Protos.Volume.newBuilder();
            + vol.setMode(Protos.Volume.Mode.RW);
            +
            + String[] parts = s.split(":");
            + switch (parts.length) {
            + case 1:
            + vol.setContainerPath(parts[0]);
            + break;
            + case 2:
            + try

            { + Protos.Volume.Mode mode = Protos.Volume.Mode.valueOf(parts[1].trim().toUpperCase()); + vol.setMode(mode) + .setContainerPath(parts[0]); + }

            catch (IllegalArgumentException e) {

              • End diff –

          I gather that this code treats a two-part spec as `container:mode` first, then falls back to `host:container`. Just curious, is there some precedent for that? I'm not sure it makes sense; for example, `/data:ro` would create an empty read-only volume, but what good is that?

          Show
          githubbot ASF GitHub Bot added a comment - Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r105826378 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java — @@ -162,11 +182,65 @@ public static MesosTaskManagerParameters create(Configuration flinkConfig) { throw new IllegalConfigurationException("invalid container type: " + containerTypeString); } + Option<String> containerVolOpt = Option.<String>apply(flinkConfig.getString(MESOS_RM_CONTAINER_VOLUMES)); + List<Protos.Volume> containerVolumes = buildVolumes(containerVolOpt); + return new MesosTaskManagerParameters( cpus, containerType, Option.apply(imageName), containeredParameters); + containeredParameters, + containerVolumes); + } + + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public static List<Protos.Volume> buildVolumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (String s : specs) { + if (s.trim().isEmpty()) { + continue; + } + Protos.Volume.Builder vol = Protos.Volume.newBuilder(); + vol.setMode(Protos.Volume.Mode.RW); + + String[] parts = s.split(":"); + switch (parts.length) { + case 1: + vol.setContainerPath(parts [0] ); + break; + case 2: + try { + Protos.Volume.Mode mode = Protos.Volume.Mode.valueOf(parts[1].trim().toUpperCase()); + vol.setMode(mode) + .setContainerPath(parts[0]); + } catch (IllegalArgumentException e) { End diff – I gather that this code treats a two-part spec as `container:mode` first, then falls back to `host:container`. Just curious, is there some precedent for that? I'm not sure it makes sense; for example, `/data:ro` would create an empty read-only volume, but what good is that?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user addisonj commented on the issue:

          https://github.com/apache/flink/pull/3481

          @EronWright @zentol minor bump on this... any other steps to get this on the path to being merged?
          Don't want to let this hang out for too long so I forgot about it

          Show
          githubbot ASF GitHub Bot added a comment - Github user addisonj commented on the issue: https://github.com/apache/flink/pull/3481 @EronWright @zentol minor bump on this... any other steps to get this on the path to being merged? Don't want to let this hang out for too long so I forgot about it
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user addisonj commented on the issue:

          https://github.com/apache/flink/pull/3481

          Okay, just moved things around, agree that its a better place, but was hesitant at first as we were building any other mesos objects earlier. Hopefully this still works. I think going forwad and adding support for other container options, such as additional network ports, additional URIs to download, that would be a better place to put them.

          Also, I have one other small commit, it wasn't because the volume info wasn't getting attached soon enough.

          This now is working for me in my dev env, which is 1.2.0 with these patches applied

          Show
          githubbot ASF GitHub Bot added a comment - Github user addisonj commented on the issue: https://github.com/apache/flink/pull/3481 Okay, just moved things around, agree that its a better place, but was hesitant at first as we were building any other mesos objects earlier. Hopefully this still works. I think going forwad and adding support for other container options, such as additional network ports, additional URIs to download, that would be a better place to put them. Also, I have one other small commit, it wasn't because the volume info wasn't getting attached soon enough. This now is working for me in my dev env, which is 1.2.0 with these patches applied
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user addisonj commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104838693

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -262,9 +264,61 @@ public String toString()

          { taskInfo.setContainer(containerInfo); }

          + containerInfo.addAllVolumes(buildVolumes(params.containerVolumes()));
          +
          return taskInfo.build();
          }

          + /**
          + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
          + *
          + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
          + * defines mount points for a container volume. If None or empty string, returns
          + * an empty iterator
          + */
          + public static List<Protos.Volume> buildVolumes(Option<String> containerVolumes) {
          + if (containerVolumes.isEmpty())

          { + return new ArrayList<Protos.Volume>(); + }

          + String[] specs = containerVolumes.get().split(",");
          + List<Protos.Volume> vols = new ArrayList<Protos.Volume>();
          + for (String s : specs) {
          + if (s.trim().isEmpty())

          { + continue; + }

          + Protos.Volume.Builder vol = Protos.Volume.newBuilder();
          + vol.setMode(Protos.Volume.Mode.RW);
          +
          + String[] parts = s.split(":");
          + switch (parts.length) {
          + case 1:
          + vol.setContainerPath(parts[0]);
          + break;
          + case 2:
          + try

          { + Protos.Volume.Mode mode = Protos.Volume.Mode.valueOf(parts[1].trim().toUpperCase()); + vol.setMode(mode) + .setContainerPath(parts[0]); + }

          catch (IllegalArgumentException e)

          { + vol.setHostPath(parts[0]) + .setContainerPath(parts[1]); + }

          + break;
          + case 3:
          + Protos.Volume.Mode mode = Protos.Volume.Mode.valueOf(parts[2].trim().toUpperCase());
          + vol.setMode(mode)
          + .setHostPath(parts[0])
          + .setContainerPath(parts[1]);
          + break;
          + default:
          + throw new IllegalArgumentException("volume specification is invalid, given: " + s);
          — End diff –

          👍 will move it there

          Show
          githubbot ASF GitHub Bot added a comment - Github user addisonj commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104838693 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -262,9 +264,61 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(buildVolumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public static List<Protos.Volume> buildVolumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (String s : specs) { + if (s.trim().isEmpty()) { + continue; + } + Protos.Volume.Builder vol = Protos.Volume.newBuilder(); + vol.setMode(Protos.Volume.Mode.RW); + + String[] parts = s.split(":"); + switch (parts.length) { + case 1: + vol.setContainerPath(parts [0] ); + break; + case 2: + try { + Protos.Volume.Mode mode = Protos.Volume.Mode.valueOf(parts[1].trim().toUpperCase()); + vol.setMode(mode) + .setContainerPath(parts[0]); + } catch (IllegalArgumentException e) { + vol.setHostPath(parts[0]) + .setContainerPath(parts[1]); + } + break; + case 3: + Protos.Volume.Mode mode = Protos.Volume.Mode.valueOf(parts [2] .trim().toUpperCase()); + vol.setMode(mode) + .setHostPath(parts [0] ) + .setContainerPath(parts [1] ); + break; + default: + throw new IllegalArgumentException("volume specification is invalid, given: " + s); — End diff – 👍 will move it there
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user EronWright commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104838231

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -262,9 +264,61 @@ public String toString()

          { taskInfo.setContainer(containerInfo); }

          + containerInfo.addAllVolumes(buildVolumes(params.containerVolumes()));
          +
          return taskInfo.build();
          }

          + /**
          + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
          + *
          + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
          + * defines mount points for a container volume. If None or empty string, returns
          + * an empty iterator
          + */
          + public static List<Protos.Volume> buildVolumes(Option<String> containerVolumes) {
          + if (containerVolumes.isEmpty())

          { + return new ArrayList<Protos.Volume>(); + }

          + String[] specs = containerVolumes.get().split(",");
          + List<Protos.Volume> vols = new ArrayList<Protos.Volume>();
          + for (String s : specs) {
          + if (s.trim().isEmpty())

          { + continue; + }

          + Protos.Volume.Builder vol = Protos.Volume.newBuilder();
          + vol.setMode(Protos.Volume.Mode.RW);
          +
          + String[] parts = s.split(":");
          + switch (parts.length) {
          + case 1:
          + vol.setContainerPath(parts[0]);
          + break;
          + case 2:
          + try

          { + Protos.Volume.Mode mode = Protos.Volume.Mode.valueOf(parts[1].trim().toUpperCase()); + vol.setMode(mode) + .setContainerPath(parts[0]); + }

          catch (IllegalArgumentException e)

          { + vol.setHostPath(parts[0]) + .setContainerPath(parts[1]); + }

          + break;
          + case 3:
          + Protos.Volume.Mode mode = Protos.Volume.Mode.valueOf(parts[2].trim().toUpperCase());
          + vol.setMode(mode)
          + .setHostPath(parts[0])
          + .setContainerPath(parts[1]);
          + break;
          + default:
          + throw new IllegalArgumentException("volume specification is invalid, given: " + s);
          — End diff –

          Ideally this validation would occur eagerly, i.e. in `MesosTaskManagerParameters::create`. Flink emphasizes eager validation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104838231 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -262,9 +264,61 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(buildVolumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public static List<Protos.Volume> buildVolumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (String s : specs) { + if (s.trim().isEmpty()) { + continue; + } + Protos.Volume.Builder vol = Protos.Volume.newBuilder(); + vol.setMode(Protos.Volume.Mode.RW); + + String[] parts = s.split(":"); + switch (parts.length) { + case 1: + vol.setContainerPath(parts [0] ); + break; + case 2: + try { + Protos.Volume.Mode mode = Protos.Volume.Mode.valueOf(parts[1].trim().toUpperCase()); + vol.setMode(mode) + .setContainerPath(parts[0]); + } catch (IllegalArgumentException e) { + vol.setHostPath(parts[0]) + .setContainerPath(parts[1]); + } + break; + case 3: + Protos.Volume.Mode mode = Protos.Volume.Mode.valueOf(parts [2] .trim().toUpperCase()); + vol.setMode(mode) + .setHostPath(parts [0] ) + .setContainerPath(parts [1] ); + break; + default: + throw new IllegalArgumentException("volume specification is invalid, given: " + s); — End diff – Ideally this validation would occur eagerly, i.e. in `MesosTaskManagerParameters::create`. Flink emphasizes eager validation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user addisonj commented on the issue:

          https://github.com/apache/flink/pull/3481

          @zentol thanks for the quality reviews. The code reads a lot cleaner with the mode parsing being handled by the enum. My java is pretty rusty after all being in scala of late

          I think that handles everything. Hopefully that makes sense for reasoning of not using a regex.

          Show
          githubbot ASF GitHub Bot added a comment - Github user addisonj commented on the issue: https://github.com/apache/flink/pull/3481 @zentol thanks for the quality reviews. The code reads a lot cleaner with the mode parsing being handled by the enum. My java is pretty rusty after all being in scala of late I think that handles everything. Hopefully that makes sense for reasoning of not using a regex.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104795283

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -262,9 +265,67 @@ public String toString()

          { taskInfo.setContainer(containerInfo); }

          + containerInfo.addAllVolumes(volumes(params.containerVolumes()));
          +
          return taskInfo.build();
          }

          + /**
          + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
          + *
          + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
          + * defines mount points for a container volume. If None or empty string, returns
          + * an empty iterator
          + */
          + public List<Protos.Volume> volumes(Option<String> containerVolumes) {
          — End diff –

          This method could be made static to prevent side-effects.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104795283 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -262,9 +265,67 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(volumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public List<Protos.Volume> volumes(Option<String> containerVolumes) { — End diff – This method could be made static to prevent side-effects.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104795107

          — Diff: flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java —
          @@ -0,0 +1,75 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.mesos.runtime.clusterframework;
          +
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.mesos.util.MesosArtifactResolver;
          +import org.apache.flink.runtime.clusterframework.ContainerSpecification;
          +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
          +import org.apache.flink.util.TestLogger;
          +import org.apache.mesos.Protos;
          +import org.junit.Test;
          +import org.mockito.Mock;
          +import org.mockito.Mockito;
          +import scala.Option;
          +
          +import java.util.List;
          +
          +import static org.junit.Assert.*;
          +
          +public class LaunchableMesosWorkerTest extends TestLogger {
          + MesosTaskManagerParameters params = Mockito.mock(MesosTaskManagerParameters.class);
          + MesosArtifactResolver resolver = Mockito.mock(MesosArtifactResolver.class);
          + ContainerSpecification containerSpec = Mockito.mock(ContainerSpecification.class);
          + Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue("1234").build();
          + LaunchableMesosWorker worker = new LaunchableMesosWorker(resolver, params, containerSpec, taskId);
          +
          + @Test
          + public void volumes() throws Exception {
          — End diff –

          by convention these method names should being with "test", i.e "testVolumes()".

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104795107 — Diff: flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java — @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.util.MesosArtifactResolver; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.util.TestLogger; +import org.apache.mesos.Protos; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import scala.Option; + +import java.util.List; + +import static org.junit.Assert.*; + +public class LaunchableMesosWorkerTest extends TestLogger { + MesosTaskManagerParameters params = Mockito.mock(MesosTaskManagerParameters.class); + MesosArtifactResolver resolver = Mockito.mock(MesosArtifactResolver.class); + ContainerSpecification containerSpec = Mockito.mock(ContainerSpecification.class); + Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue("1234").build(); + LaunchableMesosWorker worker = new LaunchableMesosWorker(resolver, params, containerSpec, taskId); + + @Test + public void volumes() throws Exception { — End diff – by convention these method names should being with "test", i.e "testVolumes()".
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104794320

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -262,9 +265,67 @@ public String toString()

          { taskInfo.setContainer(containerInfo); }

          + containerInfo.addAllVolumes(volumes(params.containerVolumes()));
          +
          return taskInfo.build();
          }

          + /**
          + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
          + *
          + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
          + * defines mount points for a container volume. If None or empty string, returns
          + * an empty iterator
          + */
          + public List<Protos.Volume> volumes(Option<String> containerVolumes) {
          — End diff –

          This method should be renamed t something like ```buildVolumes```, ```toVolumes``` or such.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104794320 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -262,9 +265,67 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(volumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public List<Protos.Volume> volumes(Option<String> containerVolumes) { — End diff – This method should be renamed t something like ```buildVolumes```, ```toVolumes``` or such.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104793521

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -262,9 +265,67 @@ public String toString()

          { taskInfo.setContainer(containerInfo); }

          + containerInfo.addAllVolumes(volumes(params.containerVolumes()));
          +
          return taskInfo.build();
          }

          + /**
          + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
          + *
          + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
          + * defines mount points for a container volume. If None or empty string, returns
          + * an empty iterator
          + */
          + public List<Protos.Volume> volumes(Option<String> containerVolumes) {
          + if (containerVolumes.isEmpty())

          { + return new ArrayList<Protos.Volume>(); + }

          + String[] specs = containerVolumes.get().split(",");
          + List<Protos.Volume> vols = new ArrayList<Protos.Volume>();
          + for (int i = 0; i < specs.length; i++) {
          + String s = specs[i];
          + if (s.isEmpty())

          { + continue; + }

          + Protos.Volume.Builder vol = Protos.Volume.newBuilder();
          + vol.setMode(Protos.Volume.Mode.RW);
          +
          + List<String> parts = Arrays.asList(s.split(":"));
          — End diff –

          eh nvm; a messed up mode value will always cause issues. It's just that we can actually detect it if parts.size == 3.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104793521 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -262,9 +265,67 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(volumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public List<Protos.Volume> volumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (int i = 0; i < specs.length; i++) { + String s = specs [i] ; + if (s.isEmpty()) { + continue; + } + Protos.Volume.Builder vol = Protos.Volume.newBuilder(); + vol.setMode(Protos.Volume.Mode.RW); + + List<String> parts = Arrays.asList(s.split(":")); — End diff – eh nvm; a messed up mode value will always cause issues. It's just that we can actually detect it if parts.size == 3.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user addisonj commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104790634

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -262,9 +265,67 @@ public String toString()

          { taskInfo.setContainer(containerInfo); }

          + containerInfo.addAllVolumes(volumes(params.containerVolumes()));
          +
          return taskInfo.build();
          }

          + /**
          + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
          + *
          + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
          + * defines mount points for a container volume. If None or empty string, returns
          + * an empty iterator
          + */
          + public List<Protos.Volume> volumes(Option<String> containerVolumes) {
          + if (containerVolumes.isEmpty())

          { + return new ArrayList<Protos.Volume>(); + }

          + String[] specs = containerVolumes.get().split(",");
          + List<Protos.Volume> vols = new ArrayList<Protos.Volume>();
          + for (int i = 0; i < specs.length; i++) {
          + String s = specs[i];
          + if (s.isEmpty())

          { + continue; + }

          + Protos.Volume.Builder vol = Protos.Volume.newBuilder();
          + vol.setMode(Protos.Volume.Mode.RW);
          — End diff –

          correct, http://mesos.apache.org/api/latest/java/org/apache/mesos/Protos.Volume.Builder.html#setMode(org.apache.mesos.Protos.Volume.Mode).

          RW seems like a sane default

          Show
          githubbot ASF GitHub Bot added a comment - Github user addisonj commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104790634 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -262,9 +265,67 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(volumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public List<Protos.Volume> volumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (int i = 0; i < specs.length; i++) { + String s = specs [i] ; + if (s.isEmpty()) { + continue; + } + Protos.Volume.Builder vol = Protos.Volume.newBuilder(); + vol.setMode(Protos.Volume.Mode.RW); — End diff – correct, http://mesos.apache.org/api/latest/java/org/apache/mesos/Protos.Volume.Builder.html#setMode(org.apache.mesos.Protos.Volume.Mode ). RW seems like a sane default
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user addisonj commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104790136

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -262,9 +265,67 @@ public String toString()

          { taskInfo.setContainer(containerInfo); }

          + containerInfo.addAllVolumes(volumes(params.containerVolumes()));
          +
          return taskInfo.build();
          }

          + /**
          + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
          + *
          + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
          + * defines mount points for a container volume. If None or empty string, returns
          + * an empty iterator
          + */
          + public List<Protos.Volume> volumes(Option<String> containerVolumes) {
          + if (containerVolumes.isEmpty())

          { + return new ArrayList<Protos.Volume>(); + }

          + String[] specs = containerVolumes.get().split(",");
          + List<Protos.Volume> vols = new ArrayList<Protos.Volume>();
          + for (int i = 0; i < specs.length; i++) {
          — End diff –

          scala has ruined me... I

          Show
          githubbot ASF GitHub Bot added a comment - Github user addisonj commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104790136 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -262,9 +265,67 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(volumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public List<Protos.Volume> volumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (int i = 0; i < specs.length; i++) { — End diff – scala has ruined me... I
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104780760

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -262,9 +265,67 @@ public String toString()

          { taskInfo.setContainer(containerInfo); }

          + containerInfo.addAllVolumes(volumes(params.containerVolumes()));
          +
          return taskInfo.build();
          }

          + /**
          + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
          + *
          + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
          + * defines mount points for a container volume. If None or empty string, returns
          + * an empty iterator
          + */
          + public List<Protos.Volume> volumes(Option<String> containerVolumes) {
          + if (containerVolumes.isEmpty())

          { + return new ArrayList<Protos.Volume>(); + }

          + String[] specs = containerVolumes.get().split(",");
          + List<Protos.Volume> vols = new ArrayList<Protos.Volume>();
          + for (int i = 0; i < specs.length; i++) {
          + String s = specs[i];
          + if (s.isEmpty())

          { + continue; + }

          + Protos.Volume.Builder vol = Protos.Volume.newBuilder();
          + vol.setMode(Protos.Volume.Mode.RW);
          +
          + List<String> parts = Arrays.asList(s.split(":"));
          — End diff –

          we don't need to wrap this in a list, we can just use a String[].

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104780760 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -262,9 +265,67 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(volumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public List<Protos.Volume> volumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (int i = 0; i < specs.length; i++) { + String s = specs [i] ; + if (s.isEmpty()) { + continue; + } + Protos.Volume.Builder vol = Protos.Volume.newBuilder(); + vol.setMode(Protos.Volume.Mode.RW); + + List<String> parts = Arrays.asList(s.split(":")); — End diff – we don't need to wrap this in a list, we can just use a String[].
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104780871

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -262,9 +265,67 @@ public String toString()

          { taskInfo.setContainer(containerInfo); }

          + containerInfo.addAllVolumes(volumes(params.containerVolumes()));
          +
          return taskInfo.build();
          }

          + /**
          + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
          + *
          + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
          + * defines mount points for a container volume. If None or empty string, returns
          + * an empty iterator
          + */
          + public List<Protos.Volume> volumes(Option<String> containerVolumes) {
          + if (containerVolumes.isEmpty())

          { + return new ArrayList<Protos.Volume>(); + }

          + String[] specs = containerVolumes.get().split(",");
          + List<Protos.Volume> vols = new ArrayList<Protos.Volume>();
          + for (int i = 0; i < specs.length; i++) {
          + String s = specs[i];
          + if (s.isEmpty())

          { + continue; + }

          + Protos.Volume.Builder vol = Protos.Volume.newBuilder();
          + vol.setMode(Protos.Volume.Mode.RW);
          — End diff –

          Is it required to call ```setMode``` at least once?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104780871 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -262,9 +265,67 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(volumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public List<Protos.Volume> volumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (int i = 0; i < specs.length; i++) { + String s = specs [i] ; + if (s.isEmpty()) { + continue; + } + Protos.Volume.Builder vol = Protos.Volume.newBuilder(); + vol.setMode(Protos.Volume.Mode.RW); — End diff – Is it required to call ```setMode``` at least once?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104780664

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -262,9 +265,67 @@ public String toString()

          { taskInfo.setContainer(containerInfo); }

          + containerInfo.addAllVolumes(volumes(params.containerVolumes()));
          +
          return taskInfo.build();
          }

          + /**
          + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
          + *
          + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
          + * defines mount points for a container volume. If None or empty string, returns
          + * an empty iterator
          + */
          + public List<Protos.Volume> volumes(Option<String> containerVolumes) {
          + if (containerVolumes.isEmpty())

          { + return new ArrayList<Protos.Volume>(); + }

          + String[] specs = containerVolumes.get().split(",");
          + List<Protos.Volume> vols = new ArrayList<Protos.Volume>();
          + for (int i = 0; i < specs.length; i++) {
          + String s = specs[i];
          + if (s.isEmpty())

          { + continue; + }

          + Protos.Volume.Builder vol = Protos.Volume.newBuilder();
          + vol.setMode(Protos.Volume.Mode.RW);
          +
          + List<String> parts = Arrays.asList(s.split(":"));
          + switch (parts.size()) {
          + case 1:
          + vol.setContainerPath(parts.get(0));
          + break;
          + case 2:
          + String modeOrPath = parts.get(1).trim().toUpperCase();
          — End diff –

          similar to below, you can use ```Protos.Volume.Mode.valueOf````. wrap it in a try-catch block; if an IllegalArgumentException is thrown we'll assume that we're deling with the patm and not the mode.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104780664 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -262,9 +265,67 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(volumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public List<Protos.Volume> volumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (int i = 0; i < specs.length; i++) { + String s = specs [i] ; + if (s.isEmpty()) { + continue; + } + Protos.Volume.Builder vol = Protos.Volume.newBuilder(); + vol.setMode(Protos.Volume.Mode.RW); + + List<String> parts = Arrays.asList(s.split(":")); + switch (parts.size()) { + case 1: + vol.setContainerPath(parts.get(0)); + break; + case 2: + String modeOrPath = parts.get(1).trim().toUpperCase(); — End diff – similar to below, you can use ```Protos.Volume.Mode.valueOf````. wrap it in a try-catch block; if an IllegalArgumentException is thrown we'll assume that we're deling with the patm and not the mode.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104780260

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -262,9 +265,67 @@ public String toString()

          { taskInfo.setContainer(containerInfo); }

          + containerInfo.addAllVolumes(volumes(params.containerVolumes()));
          +
          return taskInfo.build();
          }

          + /**
          + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
          + *
          + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
          + * defines mount points for a container volume. If None or empty string, returns
          + * an empty iterator
          + */
          + public List<Protos.Volume> volumes(Option<String> containerVolumes) {
          + if (containerVolumes.isEmpty())

          { + return new ArrayList<Protos.Volume>(); + }

          + String[] specs = containerVolumes.get().split(",");
          + List<Protos.Volume> vols = new ArrayList<Protos.Volume>();
          + for (int i = 0; i < specs.length; i++) {
          + String s = specs[i];
          + if (s.isEmpty())

          { + continue; + }

          + Protos.Volume.Builder vol = Protos.Volume.newBuilder();
          + vol.setMode(Protos.Volume.Mode.RW);
          +
          + List<String> parts = Arrays.asList(s.split(":"));
          + switch (parts.size()) {
          + case 1:
          + vol.setContainerPath(parts.get(0));
          + break;
          + case 2:
          + String modeOrPath = parts.get(1).trim().toUpperCase();
          + if (modeOrPath.equals("RW"))

          { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RW); + }

          else if (modeOrPath.equals("RO"))

          { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RO); + }

          else

          { + vol.setHostPath(parts.get(0)).setContainerPath(modeOrPath); + }

          + break;
          + case 3:
          + Protos.Volume.Mode mode = Protos.Volume.Mode.valueOf(parts.get(2).trim().toUpperCase());
          + if (mode.equals(Protos.Volume.Mode.RW)) {
          + vol.setMode(Protos.Volume.Mode.RW);
          — End diff –

          this block can be simplified to ```vol.setMode(mode)```.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104780260 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -262,9 +265,67 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(volumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public List<Protos.Volume> volumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (int i = 0; i < specs.length; i++) { + String s = specs [i] ; + if (s.isEmpty()) { + continue; + } + Protos.Volume.Builder vol = Protos.Volume.newBuilder(); + vol.setMode(Protos.Volume.Mode.RW); + + List<String> parts = Arrays.asList(s.split(":")); + switch (parts.size()) { + case 1: + vol.setContainerPath(parts.get(0)); + break; + case 2: + String modeOrPath = parts.get(1).trim().toUpperCase(); + if (modeOrPath.equals("RW")) { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RW); + } else if (modeOrPath.equals("RO")) { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RO); + } else { + vol.setHostPath(parts.get(0)).setContainerPath(modeOrPath); + } + break; + case 3: + Protos.Volume.Mode mode = Protos.Volume.Mode.valueOf(parts.get(2).trim().toUpperCase()); + if (mode.equals(Protos.Volume.Mode.RW)) { + vol.setMode(Protos.Volume.Mode.RW); — End diff – this block can be simplified to ```vol.setMode(mode)```.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104781670

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -262,9 +265,67 @@ public String toString()

          { taskInfo.setContainer(containerInfo); }

          + containerInfo.addAllVolumes(volumes(params.containerVolumes()));
          +
          return taskInfo.build();
          }

          + /**
          + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
          + *
          + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
          + * defines mount points for a container volume. If None or empty string, returns
          + * an empty iterator
          + */
          + public List<Protos.Volume> volumes(Option<String> containerVolumes) {
          + if (containerVolumes.isEmpty())

          { + return new ArrayList<Protos.Volume>(); + }

          + String[] specs = containerVolumes.get().split(",");
          + List<Protos.Volume> vols = new ArrayList<Protos.Volume>();
          + for (int i = 0; i < specs.length; i++) {
          + String s = specs[i];
          + if (s.isEmpty())

          { + continue; + }

          + Protos.Volume.Builder vol = Protos.Volume.newBuilder();
          + vol.setMode(Protos.Volume.Mode.RW);
          +
          + List<String> parts = Arrays.asList(s.split(":"));
          — End diff –

          The parsing logic is a bit brittle and inconsistent; if parts.size == 3 and the Mode is invalid an IAE is thrown. if parts.size == 2 and the mode is invalid we'll assign a bogus path.

          A regex may be more appropriate.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104781670 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -262,9 +265,67 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(volumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public List<Protos.Volume> volumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (int i = 0; i < specs.length; i++) { + String s = specs [i] ; + if (s.isEmpty()) { + continue; + } + Protos.Volume.Builder vol = Protos.Volume.newBuilder(); + vol.setMode(Protos.Volume.Mode.RW); + + List<String> parts = Arrays.asList(s.split(":")); — End diff – The parsing logic is a bit brittle and inconsistent; if parts.size == 3 and the Mode is invalid an IAE is thrown. if parts.size == 2 and the mode is invalid we'll assign a bogus path. A regex may be more appropriate.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104779998

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java —
          @@ -115,6 +122,11 @@ public ContaineredTaskManagerParameters containeredParameters()

          { return containeredParameters; }

          + /**
          + * Get the container volumes string
          + */
          + public Option<String> containerVolumes()

          { return containerVolumes; }

          — End diff –

          `please move the return statement into a separate line.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104779998 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java — @@ -115,6 +122,11 @@ public ContaineredTaskManagerParameters containeredParameters() { return containeredParameters; } + /** + * Get the container volumes string + */ + public Option<String> containerVolumes() { return containerVolumes; } — End diff – `please move the return statement into a separate line.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104779885

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -262,9 +265,67 @@ public String toString()

          { taskInfo.setContainer(containerInfo); }

          + containerInfo.addAllVolumes(volumes(params.containerVolumes()));
          +
          return taskInfo.build();
          }

          + /**
          + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
          + *
          + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
          + * defines mount points for a container volume. If None or empty string, returns
          + * an empty iterator
          + */
          + public List<Protos.Volume> volumes(Option<String> containerVolumes) {
          + if (containerVolumes.isEmpty())

          { + return new ArrayList<Protos.Volume>(); + }

          + String[] specs = containerVolumes.get().split(",");
          + List<Protos.Volume> vols = new ArrayList<Protos.Volume>();
          + for (int i = 0; i < specs.length; i++) {
          — End diff –

          This can be simplified to ```for (String s : specs)``

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104779885 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -262,9 +265,67 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(volumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public List<Protos.Volume> volumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (int i = 0; i < specs.length; i++) { — End diff – This can be simplified to ```for (String s : specs)``
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user addisonj commented on the issue:

          https://github.com/apache/flink/pull/3481

          @zentol thanks for the review! I think I addressed all those, updated via an amend. Wasn't sure if you want changes as new commits or not.

          Also, the build passed locally for me, but I haven't seen it work via travis yet, keeps failing in flink core for what appears to be totally unrelated (guessing flaky tests?).

          Let me know if there is anything else!

          Show
          githubbot ASF GitHub Bot added a comment - Github user addisonj commented on the issue: https://github.com/apache/flink/pull/3481 @zentol thanks for the review! I think I addressed all those, updated via an amend. Wasn't sure if you want changes as new commits or not. Also, the build passed locally for me, but I haven't seen it work via travis yet, keeps failing in flink core for what appears to be totally unrelated (guessing flaky tests?). Let me know if there is anything else!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user addisonj commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104752423

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -131,9 +134,7 @@ public int getPorts() {
          }

          @Override

          • public List<? extends ConstraintEvaluator> getHardConstraints() { - return null; - }

            + public List<? extends ConstraintEvaluator> getHardConstraints()

            { return null; }
              • End diff –

          oops, intellij tricking me

          Show
          githubbot ASF GitHub Bot added a comment - Github user addisonj commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104752423 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -131,9 +134,7 @@ public int getPorts() { } @Override public List<? extends ConstraintEvaluator> getHardConstraints() { - return null; - } + public List<? extends ConstraintEvaluator> getHardConstraints() { return null; } End diff – oops, intellij tricking me
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104735059

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -262,9 +263,71 @@ public String toString()

          { taskInfo.setContainer(containerInfo); }

          + containerInfo.addAllVolumes(volumes(params.containerVolumes()));
          +
          return taskInfo.build();
          }

          + /**
          + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
          + *
          + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
          + * defines mount points for a container volume. If None or empty string, returns
          + * an empty iterator
          + */
          + public List<Protos.Volume> volumes(Option<String> containerVolumes) {
          + if (containerVolumes.isEmpty())

          { + return new ArrayList<Protos.Volume>(); + }

          + String[] specs = containerVolumes.get().split(",");
          + List<Protos.Volume> vols = new ArrayList<Protos.Volume>();
          + for (String s : Arrays.asList(specs)) {
          — End diff –

          You can just loop over the String[].

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104735059 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -262,9 +263,71 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(volumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public List<Protos.Volume> volumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (String s : Arrays.asList(specs)) { — End diff – You can just loop over the String[].
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104736408

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -262,9 +263,71 @@ public String toString()

          { taskInfo.setContainer(containerInfo); }

          + containerInfo.addAllVolumes(volumes(params.containerVolumes()));
          +
          return taskInfo.build();
          }

          + /**
          + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
          + *
          + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
          + * defines mount points for a container volume. If None or empty string, returns
          + * an empty iterator
          + */
          + public List<Protos.Volume> volumes(Option<String> containerVolumes) {
          + if (containerVolumes.isEmpty())

          { + return new ArrayList<Protos.Volume>(); + }

          + String[] specs = containerVolumes.get().split(",");
          + List<Protos.Volume> vols = new ArrayList<Protos.Volume>();
          + for (String s : Arrays.asList(specs)) {
          + if (s.isEmpty())

          { + continue; + }

          + Protos.Volume.Builder vol = Protos.Volume.newBuilder();
          + vol.setMode(Protos.Volume.Mode.RW);
          +
          + List<String> parts = Arrays.asList(s.split(":"));
          + switch(parts.size()) {
          + case 1:
          + vol.setContainerPath(parts.get(0));
          + break;
          + case 2:
          + String modeOrPath = parts.get(1).toLowerCase().trim();
          + if (modeOrPath.equals("rw"))

          { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RW); + }

          else if (modeOrPath.equals("ro"))

          { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RO); + }

          else

          { + vol.setHostPath(parts.get(0)).setContainerPath(modeOrPath); + }

          + break;
          + case 3:
          + String mode = parts.get(2).toLowerCase().trim();
          + if (!mode.equals("rw") && !mode.equals("ro")) {
          — End diff –

          why not just use Protos.Volume.Moude.valueOf(mode) ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104736408 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -262,9 +263,71 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(volumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public List<Protos.Volume> volumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (String s : Arrays.asList(specs)) { + if (s.isEmpty()) { + continue; + } + Protos.Volume.Builder vol = Protos.Volume.newBuilder(); + vol.setMode(Protos.Volume.Mode.RW); + + List<String> parts = Arrays.asList(s.split(":")); + switch(parts.size()) { + case 1: + vol.setContainerPath(parts.get(0)); + break; + case 2: + String modeOrPath = parts.get(1).toLowerCase().trim(); + if (modeOrPath.equals("rw")) { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RW); + } else if (modeOrPath.equals("ro")) { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RO); + } else { + vol.setHostPath(parts.get(0)).setContainerPath(modeOrPath); + } + break; + case 3: + String mode = parts.get(2).toLowerCase().trim(); + if (!mode.equals("rw") && !mode.equals("ro")) { — End diff – why not just use Protos.Volume.Moude.valueOf(mode) ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104734849

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -131,9 +134,7 @@ public int getPorts() {
          }

          @Override

          • public List<? extends ConstraintEvaluator> getHardConstraints() { - return null; - }

            + public List<? extends ConstraintEvaluator> getHardConstraints()

            { return null; }
              • End diff –

          Please revert this change.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104734849 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -131,9 +134,7 @@ public int getPorts() { } @Override public List<? extends ConstraintEvaluator> getHardConstraints() { - return null; - } + public List<? extends ConstraintEvaluator> getHardConstraints() { return null; } End diff – Please revert this change.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104736962

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -262,9 +263,71 @@ public String toString()

          { taskInfo.setContainer(containerInfo); }

          + containerInfo.addAllVolumes(volumes(params.containerVolumes()));
          +
          return taskInfo.build();
          }

          + /**
          + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
          + *
          + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
          + * defines mount points for a container volume. If None or empty string, returns
          + * an empty iterator
          + */
          + public List<Protos.Volume> volumes(Option<String> containerVolumes) {
          + if (containerVolumes.isEmpty())

          { + return new ArrayList<Protos.Volume>(); + }

          + String[] specs = containerVolumes.get().split(",");
          + List<Protos.Volume> vols = new ArrayList<Protos.Volume>();
          + for (String s : Arrays.asList(specs)) {
          + if (s.isEmpty())

          { + continue; + }

          + Protos.Volume.Builder vol = Protos.Volume.newBuilder();
          + vol.setMode(Protos.Volume.Mode.RW);
          +
          + List<String> parts = Arrays.asList(s.split(":"));
          + switch(parts.size()) {
          + case 1:
          + vol.setContainerPath(parts.get(0));
          + break;
          + case 2:
          + String modeOrPath = parts.get(1).toLowerCase().trim();
          + if (modeOrPath.equals("rw"))

          { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RW); + }

          else if (modeOrPath.equals("ro"))

          { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RO); + }

          else

          { + vol.setHostPath(parts.get(0)).setContainerPath(modeOrPath); + }

          + break;
          + case 3:
          + String mode = parts.get(2).toLowerCase().trim();
          + if (!mode.equals("rw") && !mode.equals("ro"))

          { + throw new IllegalArgumentException("invalid mode in volume"); + }

          + if (mode.equals("rw"))

          { + vol.setMode(Protos.Volume.Mode.RW); + }

          else

          { + vol.setMode(Protos.Volume.Mode.RO); + }

          + vol.setHostPath(parts.get(0)).setContainerPath(parts.get(1));
          + break;
          + default:
          + throw new IllegalArgumentException("volume specification is invalid");
          — End diff –

          We should include in volume specification in the exception message.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104736962 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -262,9 +263,71 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(volumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public List<Protos.Volume> volumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (String s : Arrays.asList(specs)) { + if (s.isEmpty()) { + continue; + } + Protos.Volume.Builder vol = Protos.Volume.newBuilder(); + vol.setMode(Protos.Volume.Mode.RW); + + List<String> parts = Arrays.asList(s.split(":")); + switch(parts.size()) { + case 1: + vol.setContainerPath(parts.get(0)); + break; + case 2: + String modeOrPath = parts.get(1).toLowerCase().trim(); + if (modeOrPath.equals("rw")) { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RW); + } else if (modeOrPath.equals("ro")) { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RO); + } else { + vol.setHostPath(parts.get(0)).setContainerPath(modeOrPath); + } + break; + case 3: + String mode = parts.get(2).toLowerCase().trim(); + if (!mode.equals("rw") && !mode.equals("ro")) { + throw new IllegalArgumentException("invalid mode in volume"); + } + if (mode.equals("rw")) { + vol.setMode(Protos.Volume.Mode.RW); + } else { + vol.setMode(Protos.Volume.Mode.RO); + } + vol.setHostPath(parts.get(0)).setContainerPath(parts.get(1)); + break; + default: + throw new IllegalArgumentException("volume specification is invalid"); — End diff – We should include in volume specification in the exception message.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104735340

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java —
          @@ -115,6 +122,12 @@ public ContaineredTaskManagerParameters containeredParameters()

          { return containeredParameters; }

          +
          — End diff –

          remove empty line

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104735340 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java — @@ -115,6 +122,12 @@ public ContaineredTaskManagerParameters containeredParameters() { return containeredParameters; } + — End diff – remove empty line
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104735237

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -262,9 +263,71 @@ public String toString()

          { taskInfo.setContainer(containerInfo); }

          + containerInfo.addAllVolumes(volumes(params.containerVolumes()));
          +
          return taskInfo.build();
          }

          + /**
          + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
          + *
          + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
          + * defines mount points for a container volume. If None or empty string, returns
          + * an empty iterator
          + */
          + public List<Protos.Volume> volumes(Option<String> containerVolumes) {
          + if (containerVolumes.isEmpty())

          { + return new ArrayList<Protos.Volume>(); + }

          + String[] specs = containerVolumes.get().split(",");
          + List<Protos.Volume> vols = new ArrayList<Protos.Volume>();
          + for (String s : Arrays.asList(specs)) {
          + if (s.isEmpty())

          { + continue; + }

          + Protos.Volume.Builder vol = Protos.Volume.newBuilder();
          + vol.setMode(Protos.Volume.Mode.RW);
          +
          + List<String> parts = Arrays.asList(s.split(":"));
          + switch(parts.size()) {
          + case 1:
          + vol.setContainerPath(parts.get(0));
          + break;
          + case 2:
          + String modeOrPath = parts.get(1).toLowerCase().trim();
          + if (modeOrPath.equals("rw"))

          { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RW); + }

          else if (modeOrPath.equals("ro"))

          { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RO); + }

          else

          { + vol.setHostPath(parts.get(0)).setContainerPath(modeOrPath); + }

          + break;
          + case 3:
          + String mode = parts.get(2).toLowerCase().trim();
          + if (!mode.equals("rw") && !mode.equals("ro"))

          { + throw new IllegalArgumentException("invalid mode in volume"); + }

          + if (mode.equals("rw"))

          { + vol.setMode(Protos.Volume.Mode.RW); + }

          else

          { + vol.setMode(Protos.Volume.Mode.RO); + }

          + vol.setHostPath(parts.get(0)).setContainerPath(parts.get(1));
          + break;
          + default:
          + throw new IllegalArgumentException("volume specification is invalid");
          +
          — End diff –

          remove empty line

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104735237 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -262,9 +263,71 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(volumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public List<Protos.Volume> volumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (String s : Arrays.asList(specs)) { + if (s.isEmpty()) { + continue; + } + Protos.Volume.Builder vol = Protos.Volume.newBuilder(); + vol.setMode(Protos.Volume.Mode.RW); + + List<String> parts = Arrays.asList(s.split(":")); + switch(parts.size()) { + case 1: + vol.setContainerPath(parts.get(0)); + break; + case 2: + String modeOrPath = parts.get(1).toLowerCase().trim(); + if (modeOrPath.equals("rw")) { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RW); + } else if (modeOrPath.equals("ro")) { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RO); + } else { + vol.setHostPath(parts.get(0)).setContainerPath(modeOrPath); + } + break; + case 3: + String mode = parts.get(2).toLowerCase().trim(); + if (!mode.equals("rw") && !mode.equals("ro")) { + throw new IllegalArgumentException("invalid mode in volume"); + } + if (mode.equals("rw")) { + vol.setMode(Protos.Volume.Mode.RW); + } else { + vol.setMode(Protos.Volume.Mode.RO); + } + vol.setHostPath(parts.get(0)).setContainerPath(parts.get(1)); + break; + default: + throw new IllegalArgumentException("volume specification is invalid"); + — End diff – remove empty line
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3481#discussion_r104735256

          — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java —
          @@ -262,9 +263,71 @@ public String toString()

          { taskInfo.setContainer(containerInfo); }

          + containerInfo.addAllVolumes(volumes(params.containerVolumes()));
          +
          return taskInfo.build();
          }

          + /**
          + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
          + *
          + * @param containerVolumes a comma delimited optional string of [host_path:]container_path[:RO|RW] that
          + * defines mount points for a container volume. If None or empty string, returns
          + * an empty iterator
          + */
          + public List<Protos.Volume> volumes(Option<String> containerVolumes) {
          + if (containerVolumes.isEmpty())

          { + return new ArrayList<Protos.Volume>(); + }

          + String[] specs = containerVolumes.get().split(",");
          + List<Protos.Volume> vols = new ArrayList<Protos.Volume>();
          + for (String s : Arrays.asList(specs)) {
          + if (s.isEmpty())

          { + continue; + }

          + Protos.Volume.Builder vol = Protos.Volume.newBuilder();
          + vol.setMode(Protos.Volume.Mode.RW);
          +
          + List<String> parts = Arrays.asList(s.split(":"));
          + switch(parts.size()) {
          + case 1:
          + vol.setContainerPath(parts.get(0));
          + break;
          + case 2:
          + String modeOrPath = parts.get(1).toLowerCase().trim();
          + if (modeOrPath.equals("rw"))

          { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RW); + }

          else if (modeOrPath.equals("ro"))

          { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RO); + }

          else

          { + vol.setHostPath(parts.get(0)).setContainerPath(modeOrPath); + }

          + break;
          + case 3:
          + String mode = parts.get(2).toLowerCase().trim();
          + if (!mode.equals("rw") && !mode.equals("ro"))

          { + throw new IllegalArgumentException("invalid mode in volume"); + }

          + if (mode.equals("rw"))

          { + vol.setMode(Protos.Volume.Mode.RW); + }

          else

          { + vol.setMode(Protos.Volume.Mode.RO); + }

          + vol.setHostPath(parts.get(0)).setContainerPath(parts.get(1));
          + break;
          + default:
          + throw new IllegalArgumentException("volume specification is invalid");
          +
          + }
          +
          — End diff –

          remove empty line

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3481#discussion_r104735256 — Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java — @@ -262,9 +263,71 @@ public String toString() { taskInfo.setContainer(containerInfo); } + containerInfo.addAllVolumes(volumes(params.containerVolumes())); + return taskInfo.build(); } + /** + * Used to build volume specs for mesos. This allows for mounting additional volumes into a container + * + * @param containerVolumes a comma delimited optional string of [host_path:] container_path [:RO|RW] that + * defines mount points for a container volume. If None or empty string, returns + * an empty iterator + */ + public List<Protos.Volume> volumes(Option<String> containerVolumes) { + if (containerVolumes.isEmpty()) { + return new ArrayList<Protos.Volume>(); + } + String[] specs = containerVolumes.get().split(","); + List<Protos.Volume> vols = new ArrayList<Protos.Volume>(); + for (String s : Arrays.asList(specs)) { + if (s.isEmpty()) { + continue; + } + Protos.Volume.Builder vol = Protos.Volume.newBuilder(); + vol.setMode(Protos.Volume.Mode.RW); + + List<String> parts = Arrays.asList(s.split(":")); + switch(parts.size()) { + case 1: + vol.setContainerPath(parts.get(0)); + break; + case 2: + String modeOrPath = parts.get(1).toLowerCase().trim(); + if (modeOrPath.equals("rw")) { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RW); + } else if (modeOrPath.equals("ro")) { + vol.setContainerPath(parts.get(0)); + vol.setMode(Protos.Volume.Mode.RO); + } else { + vol.setHostPath(parts.get(0)).setContainerPath(modeOrPath); + } + break; + case 3: + String mode = parts.get(2).toLowerCase().trim(); + if (!mode.equals("rw") && !mode.equals("ro")) { + throw new IllegalArgumentException("invalid mode in volume"); + } + if (mode.equals("rw")) { + vol.setMode(Protos.Volume.Mode.RW); + } else { + vol.setMode(Protos.Volume.Mode.RO); + } + vol.setHostPath(parts.get(0)).setContainerPath(parts.get(1)); + break; + default: + throw new IllegalArgumentException("volume specification is invalid"); + + } + — End diff – remove empty line
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user addisonj opened a pull request:

          https://github.com/apache/flink/pull/3481

          FLINK-5975 Add volume support to flink-mesos

          When using containerization, specifically, docker, it is useful to be
          able to attach additional volumes, such as an NFS share.

          This adds support for volumes to be attached via specifying a new config
          values `mesos.resourcemanager.tasks.container.volumes`. This is comma
          delimited string of `[host_path:]container_path[:RO|RW]`.

          It is modeled after the spark mesos framework

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/addisonj/flink master

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3481.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3481


          commit ddce9d20b0dd7a9dd9d5ea0a8cfb5ed61d5685c2
          Author: Addison Higham <ahigham@instructure.com>
          Date: 2017-03-07T06:40:16Z

          FLINK-5975 Add volume support to flink-mesos

          When using containerization, specifically, docker, it is useful to be
          able to attach additional volumes, such as an NFS share.

          This adds support for volumes to be attached via specifying a new config
          values `mesos.resourcemanager.tasks.container.volumes`. This is comma
          delimited string of `[host_path:]container_path[:RO|RW]`.

          It is modeled after the spark mesos framework


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user addisonj opened a pull request: https://github.com/apache/flink/pull/3481 FLINK-5975 Add volume support to flink-mesos When using containerization, specifically, docker, it is useful to be able to attach additional volumes, such as an NFS share. This adds support for volumes to be attached via specifying a new config values `mesos.resourcemanager.tasks.container.volumes`. This is comma delimited string of ` [host_path:] container_path [:RO|RW] `. It is modeled after the spark mesos framework Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/addisonj/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3481.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3481 commit ddce9d20b0dd7a9dd9d5ea0a8cfb5ed61d5685c2 Author: Addison Higham <ahigham@instructure.com> Date: 2017-03-07T06:40:16Z FLINK-5975 Add volume support to flink-mesos When using containerization, specifically, docker, it is useful to be able to attach additional volumes, such as an NFS share. This adds support for volumes to be attached via specifying a new config values `mesos.resourcemanager.tasks.container.volumes`. This is comma delimited string of ` [host_path:] container_path [:RO|RW] `. It is modeled after the spark mesos framework

            People

            • Assignee:
              addisonj@gmail.com Addison Higham
              Reporter:
              addisonj@gmail.com Addison Higham
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development