diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml
index 4da4ac5acb9..6187c75114a 100644
--- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml
+++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml
@@ -28,6 +28,14 @@
bin
0755
+
+ hadoop-yarn/bin
+ sbin
+ 0755
+
+ docker_to_squash.py
+
+
hadoop-yarn/bin
bin
diff --git a/hadoop-yarn-project/hadoop-yarn/bin/docker_to_squash.py b/hadoop-yarn-project/hadoop-yarn/bin/docker_to_squash.py
new file mode 100755
index 00000000000..5972588ccac
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/bin/docker_to_squash.py
@@ -0,0 +1,1311 @@
+#!/usr/bin/env python
+
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+docker_to_squash.py is a tool to facilitate the process of converting
+Docker images into squashFS layers, manifests, and configs.
+
+Tool dependencies: skopeo, squashfs-tools, tar, setfattr
+"""
+
+import argparse
+from collections import Iterable
+import hashlib
+import json
+import logging
+import os
+import re
+import shutil
+import subprocess
+
+LOG_LEVEL = None
+HADOOP_BIN_DIR = None
+
+def shell_command(command, print_stdout, print_stderr, raise_on_error):
+ global LOG_LEVEL
+ stdout_val = subprocess.PIPE
+ stderr_val = subprocess.PIPE
+
+ logging.debug("command: %s", command)
+
+ if print_stdout:
+ stdout_val = None
+
+ if print_stderr or LOG_LEVEL == "DEBUG":
+ stderr_val = None
+
+ process = None
+ try:
+ process = subprocess.Popen(command, stdout=stdout_val,
+ stderr=stderr_val)
+ out, err = process.communicate()
+ except:
+ if process and process.poll() is None:
+ process.kill()
+ raise Exception("Popen failure")
+
+ if raise_on_error and process.returncode is not 0:
+ raise Exception("Commmand: " + str(command)
+ + " failed with returncode: "
+ + str(process.returncode) + "\nstdout: "
+ + str(out) + "\nstderr: " + str(err))
+
+ return out, err, process.returncode
+
+def does_hdfs_entry_exist(entry):
+ out, err, returncode = hdfs_ls(entry)
+ if returncode is not 0:
+ return False
+ return True
+
+def setup_hdfs_dir(dir_entry):
+ if does_hdfs_entry_exist(dir_entry):
+ hdfs_chmod("755", dir_entry)
+ return
+
+ hdfs_mkdir(dir_entry, raise_on_error=False)
+ if does_hdfs_entry_exist(dir_entry):
+ hdfs_chmod("755", dir_entry)
+ return
+
+ directories = dir_entry.split("/")[1:]
+ dir_path = ""
+ for directory in directories:
+ dir_path = dir_path + "/" + directory
+ if not does_hdfs_entry_exist(dir_path):
+ hdfs_mkdir(dir_entry, create_parents=True)
+ chmod_dir = dir_path
+ hdfs_chmod("755", chmod_dir, recursive=True)
+ break
+
+def append_or_extend_to_list(src, src_list):
+ if isinstance(src, list):
+ src_list.extend(src)
+ else:
+ src_list.append(src)
+
+def hdfs_get(src, dest, print_stdout=False, print_stderr=False, raise_on_error=True):
+ global HADOOP_BIN_DIR
+ command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-get"]
+ append_or_extend_to_list(src, command)
+ command.append(dest)
+ out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, returncode
+
+def hdfs_ls(file_path, options="", print_stdout=False, print_stderr=False, raise_on_error=False):
+ global HADOOP_BIN_DIR
+ command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-ls"]
+ if options:
+ append_or_extend_to_list(options, command)
+ append_or_extend_to_list(file_path, command)
+ out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, returncode
+
+def hdfs_cat(file_path, print_stdout=False, print_stderr=True, raise_on_error=True):
+ global HADOOP_BIN_DIR
+ command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-cat"]
+ append_or_extend_to_list(file_path, command)
+ out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, returncode
+
+def hdfs_mkdir(file_path, print_stdout=False, print_stderr=True, raise_on_error=True,
+ create_parents=False):
+ global HADOOP_BIN_DIR
+ command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-mkdir"]
+ if create_parents:
+ command.append("-p")
+ append_or_extend_to_list(file_path, command)
+ out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, returncode
+
+def hdfs_rm(file_path, print_stdout=False, print_stderr=True, raise_on_error=True):
+ global HADOOP_BIN_DIR
+ command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-rm"]
+ append_or_extend_to_list(file_path, command)
+ out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, returncode
+
+def hdfs_put(src, dest, force=False, print_stdout=False, print_stderr=True, raise_on_error=True):
+ global HADOOP_BIN_DIR
+ command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-put"]
+ if force:
+ command.append("-f")
+ append_or_extend_to_list(src, command)
+ command.append(dest)
+ out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, returncode
+
+def hdfs_chmod(mode, file_path, print_stdout=False, print_stderr=True, raise_on_error=True,
+ recursive=False):
+ global HADOOP_BIN_DIR
+ command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-chmod"]
+ if recursive:
+ command.append("-R")
+ command.append(mode)
+ append_or_extend_to_list(file_path, command)
+ out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, returncode
+
+def hdfs_setrep(replication, file_path, print_stdout=False, print_stderr=True, raise_on_error=True):
+ global HADOOP_BIN_DIR
+ command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-setrep", str(replication)]
+ append_or_extend_to_list(file_path, command)
+ out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, returncode
+
+def hdfs_cp(src, dest, force=False, print_stdout=False, print_stderr=True, raise_on_error=False):
+ global HADOOP_BIN_DIR
+ command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-cp"]
+ if force:
+ command.append("-f")
+ append_or_extend_to_list(src, command)
+ command.append(dest)
+ out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, returncode
+
+def get_working_dir(directory):
+ try:
+ if os.path.isdir(directory):
+ working_dir = os.path.join(directory, "docker-to-squash")
+ else:
+ working_dir = directory
+ os.makedirs(working_dir)
+ except:
+ raise Exception("Could not create working_dir: " + working_dir)
+ return working_dir
+
+def is_sha256_hash(string):
+ if not re.findall(r"^[a-fA-F\d]{64,64}$", string):
+ return False
+ return True
+
+def calculate_file_hash(filename):
+ sha = hashlib.sha256()
+ with open(filename, 'rb') as file_pointer:
+ while True:
+ data = file_pointer.read(65536)
+ if not data:
+ break
+ sha.update(data)
+ return sha.hexdigest()
+
+def calculate_string_hash(string):
+ sha = hashlib.sha256()
+ sha.update(string)
+ return sha.hexdigest()
+
+def get_local_manifest_from_path(manifest_path):
+ with open(manifest_path, "rb") as file_pointer:
+ out = file_pointer.read()
+ manifest_hash = calculate_string_hash(str(out))
+ manifest = json.loads(out)
+ return manifest, manifest_hash
+
+def get_hdfs_manifest_from_path(manifest_path):
+ out, err, returncode = hdfs_cat(manifest_path)
+ manifest_hash = calculate_string_hash(str(out))
+ manifest = json.loads(out)
+ return manifest, manifest_hash
+
+def get_config_hash_from_manifest(manifest):
+ config_hash = manifest['config']['digest'].split(":", 1)[1]
+ return config_hash
+
+def get_layer_hashes_from_manifest(manifest):
+ layers = []
+ layers_dict = manifest['layers']
+ for layer in layers_dict:
+ layers.append(layer['digest'].split(":", 1)[1])
+ return layers
+
+def get_pull_fmt_string(pull_format):
+ pull_fmt_string = pull_format + ":"
+ if pull_format == "docker":
+ pull_fmt_string = pull_fmt_string + "//"
+ return pull_fmt_string
+
+def get_manifest_from_docker_image(pull_format, image):
+ pull_fmt_string = get_pull_fmt_string(pull_format)
+ out, err, returncode = shell_command(["skopeo", "inspect", "--raw", pull_fmt_string + image],
+ False, True, True)
+ manifest = json.loads(out)
+ if 'manifests' in manifest:
+ logging.debug("skopeo inspect --raw returned a list of manifests")
+ manifests_dict = manifest['manifests']
+ sha = None
+ for mfest in manifests_dict:
+ if(mfest['platform']['architecture'] == "amd64"):
+ sha = mfest['digest']
+ break
+ if not sha:
+ raise Exception("Could not find amd64 manifest for" + image)
+
+ image_without_tag = image.split("/", 1)[-1].split(":", 1)[0]
+ image_and_sha = image_without_tag + "@" + sha
+
+ logging.debug("amd64 manifest sha is: %s", sha)
+
+ manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image_and_sha)
+ else:
+ manifest_hash = calculate_string_hash(str(out))
+
+ logging.debug("manifest: %s", str(manifest))
+ return manifest, manifest_hash
+
+def split_image_and_tag(image_and_tag):
+ split = image_and_tag.split(",")
+ image = split[0]
+ tags = split[1:]
+ return image, tags
+
+def read_image_tag_to_hash(image_tag_to_hash):
+ hash_to_tags = dict()
+ tag_to_hash = dict()
+ with open(image_tag_to_hash, 'rb') as file_pointer:
+ while True:
+ line = file_pointer.readline()
+ if not line:
+ break
+ line = line.rstrip()
+
+ if not line:
+ continue
+
+ comment_split_line = line.split("#", 1)
+ line = comment_split_line[0]
+ comment = comment_split_line[1:]
+
+ split_line = line.rsplit(":", 1)
+ manifest_hash = split_line[-1]
+ tags_list = ' '.join(split_line[:-1]).split(",")
+
+ if not is_sha256_hash(manifest_hash) or not tags_list:
+ logging.warn("image-tag-to-hash file malformed. Skipping entry %s", line)
+ continue
+
+ tags_and_comments = hash_to_tags.get(manifest_hash, None)
+ if tags_and_comments is None:
+ known_tags = tags_list
+ known_comment = comment
+ else:
+ known_tags = tags_and_comments[0]
+ for tag in tags_list:
+ if tag not in known_tags:
+ known_tags.append(tag)
+ known_comment = tags_and_comments[1]
+ known_comment.extend(comment)
+
+ hash_to_tags[manifest_hash] = (known_tags, known_comment)
+
+ for tag in tags_list:
+ cur_manifest = tag_to_hash.get(tag, None)
+ if cur_manifest is not None:
+ logging.warn("tag_to_hash already has manifest %s defined for tag %s."
+ + "This entry will be overwritten", cur_manifest, tag)
+ tag_to_hash[tag] = manifest_hash
+ return hash_to_tags, tag_to_hash
+
+def remove_tag_from_dicts(hash_to_tags, tag_to_hash, tag):
+ if not hash_to_tags:
+ logging.debug("hash_to_tags is null. Not removing tag %s", tag)
+ return
+
+ prev_hash = tag_to_hash.get(tag, None)
+
+ if prev_hash is not None:
+ del tag_to_hash[tag]
+ prev_tags, prev_comment = hash_to_tags.get(prev_hash, (None, None))
+ prev_tags.remove(tag)
+ if prev_tags == 0:
+ del hash_to_tags[prev_hash]
+ else:
+ hash_to_tags[prev_hash] = (prev_tags, prev_comment)
+ else:
+ logging.debug("Tag not found. Not removing tag: %s", tag)
+
+def remove_image_hash_from_dicts(hash_to_tags, tag_to_hash, image_hash):
+ if not hash_to_tags:
+ logging.debug("hash_to_tags is null. Not removing image_hash %s", image_hash)
+ return
+ logging.debug("hash_to_tags: %s", str(hash_to_tags))
+ logging.debug("Removing image_hash from dicts: %s", image_hash)
+ prev_tags, prev_comments = hash_to_tags.get(image_hash, None)
+
+ if prev_tags is not None:
+ hash_to_tags.pop(image_hash)
+ for tag in prev_tags:
+ del tag_to_hash[tag]
+
+def add_tag_to_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment):
+ tag_to_hash[tag] = manifest_hash
+ new_tags_and_comments = hash_to_tags.get(manifest_hash, None)
+ if new_tags_and_comments is None:
+ new_tags = [tag]
+ new_comment = [comment]
+ else:
+ new_tags = new_tags_and_comments[0]
+ new_comment = new_tags_and_comments[1]
+ if tag not in new_tags:
+ new_tags.append(tag)
+ if comment and comment not in new_comment:
+ new_comment.append(comment)
+ hash_to_tags[manifest_hash] = (new_tags, new_comment)
+
+def write_local_image_tag_to_hash(image_tag_to_hash, hash_to_tags):
+ with open(image_tag_to_hash, 'w') as file_pointer:
+ for key, value in hash_to_tags.iteritems():
+ manifest_hash = key
+ tags = ','.join(map(str, value[0]))
+ if tags:
+ comment = ', '.join(map(str, value[1]))
+ if comment > 0:
+ comment = "#" + comment
+ file_pointer.write(tags + ":" + manifest_hash + comment + "\n")
+ else:
+ for comment in value[1]:
+ file_pointer.write("#" + comment + "\n")
+
+def update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags,
+ manifest_hash, comment):
+ for tag in tags:
+ update_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment)
+
+def update_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment):
+ remove_tag_from_dicts(hash_to_tags, tag_to_hash, tag)
+ add_tag_to_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment)
+
+def remove_from_dicts(hash_to_tags, tag_to_hash, tags):
+ for tag in tags:
+ logging.debug("removing tag: %s", tag)
+ remove_tag_from_dicts(hash_to_tags, tag_to_hash, tag)
+
+def populate_tag_dicts(hdfs_root, image_tag_to_hash, local_image_tag_to_hash):
+
+ if does_hdfs_entry_exist(hdfs_root + "/" + image_tag_to_hash):
+ hdfs_get(hdfs_root + "/" + image_tag_to_hash, local_image_tag_to_hash)
+ image_tag_to_hash_hash = calculate_file_hash(local_image_tag_to_hash)
+ else:
+ image_tag_to_hash_hash = 0
+
+ if image_tag_to_hash_hash != 0:
+ hash_to_tags, tag_to_hash = read_image_tag_to_hash(local_image_tag_to_hash)
+ else:
+ hash_to_tags = {}
+ tag_to_hash = {}
+ return hash_to_tags, tag_to_hash, image_tag_to_hash_hash
+
+
+def setup_squashfs_hdfs_dirs(layers, config, manifest):
+ dirs = [layers, config, manifest]
+ logging.debug("Setting up squashfs dirs: %s", str(dirs))
+ for dir_entry in dirs:
+ setup_hdfs_dir(dir_entry)
+
+def skopeo_copy_image(pull_format, image, skopeo_format, skopeo_dir):
+ logging.info("Pulling image: %s", image)
+ if os.path.isdir(skopeo_dir):
+ raise Exception("Skopeo output directory already exists. "
+ + "Please delete and try again "
+ + "Directory: " + skopeo_dir)
+ pull_fmt_string = get_pull_fmt_string(pull_format)
+ shell_command(["skopeo", "copy", pull_fmt_string + image,
+ skopeo_format + ":" + skopeo_dir], False, True, True)
+
+def untar_layer(tmp_dir, layer_path):
+ shell_command(["sudo", "tar", "-C", tmp_dir, "--xattrs",
+ "--xattrs-include='*'", "-xf", layer_path],
+ False, True, True)
+
+def tar_file_search(archive, target):
+ out, err, returncode = shell_command(["tar", "-xf", archive, target, "-O"],
+ False, False, False)
+ return out
+
+def set_fattr(directory):
+ shell_command(["sudo", "setfattr", "-n", "trusted.overlay.opaque",
+ "-v", "y", directory], False, True, True)
+
+def make_whiteout_block_device(file_path, whiteout):
+ shell_command(["sudo", "mknod", "-m", "000", file_path,
+ "c", "0", "0"], False, True, True)
+
+ out, err, returncode = shell_command(["stat", "-c", "%U:%G", whiteout], False, True, True)
+ perms = str(out).strip()
+
+ shell_command(["sudo", "chown", perms, file_path], False, True, True)
+
+def convert_oci_whiteouts(tmp_dir):
+ out, err, returncode = shell_command(["sudo", "find", tmp_dir, "-name", ".wh.*"],
+ False, False, True)
+ whiteouts = str(out).splitlines()
+ for whiteout in whiteouts:
+ if whiteout == 0:
+ continue
+ basename = os.path.basename(whiteout)
+ directory = os.path.dirname(whiteout)
+ if basename == ".wh..wh..opq":
+ set_fattr(directory)
+ else:
+ whiteout_string = ".wh."
+ idx = basename.rfind(whiteout_string)
+ bname = basename[idx+len(whiteout_string):]
+ file_path = os.path.join(directory, bname)
+ make_whiteout_block_device(file_path, whiteout)
+ shell_command(["sudo", "rm", whiteout], False, True, True)
+
+def dir_to_squashfs(tmp_dir, squash_path):
+ shell_command(["sudo", "mksquashfs", tmp_dir, squash_path],
+ False, True, True)
+
+def upload_to_hdfs(file_path, file_name, hdfs_dir, replication, mode, force=False):
+ dest = hdfs_dir + "/" + file_name
+
+ if does_hdfs_entry_exist(dest):
+ if not force:
+ logging.warn("Not uploading to HDFS. File already exists: %s", dest)
+ return
+ logging.info("File already exists, but overwriting due to force option: %s", dest)
+
+ hdfs_put(file_path, dest, force)
+ hdfs_setrep(replication, dest)
+ hdfs_chmod(mode, dest)
+ logging.info("Uploaded file %s with replication %d and permissions %s",
+ dest, replication, mode)
+
+def upload_image_tag_to_hash_to_hdfs(file_path, file_name, hdfs_dir,
+ replication,
+ image_tag_to_hash_hash):
+ local_hash = calculate_file_hash(file_path)
+ if local_hash == image_tag_to_hash_hash:
+ logging.info("image_tag_to_hash file unchanged. Not uploading")
+ return
+ hdfs_file_path = hdfs_dir + "/" + file_name
+ try:
+ hdfs_put(file_path, hdfs_file_path, force=True)
+ hdfs_setrep(replication, hdfs_file_path)
+ hdfs_chmod("444", hdfs_file_path)
+
+ except:
+ raise Exception("image tag to hash file upload failed")
+
+def docker_to_squash(layer_dir, layer, working_dir):
+ tmp_dir = os.path.join(working_dir, "expand_archive_" + layer)
+ layer_path = os.path.join(layer_dir, layer)
+ squash_path = layer_path + ".sqsh"
+
+ if os.path.isdir(tmp_dir):
+ raise Exception("tmp_dir already exists. Please delete and try again " +
+ "Directory: " + tmp_dir)
+ os.makedirs(tmp_dir)
+
+ try:
+ untar_layer(tmp_dir, layer_path)
+ convert_oci_whiteouts(tmp_dir)
+ dir_to_squashfs(tmp_dir, squash_path)
+ finally:
+ os.remove(layer_path)
+ shell_command(["sudo", "rm", "-rf", tmp_dir],
+ False, True, True)
+
+
+def check_image_for_magic_file(magic_file, skopeo_dir, layers):
+ logging.debug("Searching for magic file %s", magic_file)
+ for layer in layers:
+ ret = tar_file_search(os.path.join(skopeo_dir, layer), magic_file)
+ if ret:
+ logging.debug("Found magic file %s in layer %s", magic_file, layer)
+ logging.debug("Magic file %s has contents:\n%s", magic_file, ret)
+ return ret
+ raise Exception("Magic file %s doesn't exist in any layer" % (magic_file))
+
+def pull_build_push_update(args):
+ skopeo_format = args.skopeo_format
+ pull_format = args.pull_format
+ hdfs_root = args.hdfs_root
+ image_tag_to_hash = args.image_tag_to_hash
+ replication = args.replication
+ force = args.force
+ images_and_tags = args.images_and_tags
+ check_magic_file = args.check_magic_file
+ magic_file = args.magic_file
+
+ hdfs_layers_dir = hdfs_root + "/layers"
+ hdfs_config_dir = hdfs_root + "/config"
+ hdfs_manifest_dir = hdfs_root + "/manifests"
+ working_dir = None
+
+
+ try:
+ working_dir = get_working_dir(args.working_dir)
+ local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
+ setup_hdfs_dir(hdfs_root)
+ setup_squashfs_hdfs_dirs(hdfs_layers_dir, hdfs_config_dir, hdfs_manifest_dir)
+ hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root,
+ image_tag_to_hash,
+ local_image_tag_to_hash)
+
+ for image_and_tag_arg in images_and_tags:
+ image, tags = split_image_and_tag(image_and_tag_arg)
+ if not image or not tags:
+ raise Exception("Positional parameter requires an image and at least 1 tag: "
+ + image_and_tag_arg)
+
+ logging.info("Working on image %s with tags %s", image, str(tags))
+ manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image)
+
+ layers = get_layer_hashes_from_manifest(manifest)
+
+ config_hash = get_config_hash_from_manifest(manifest)
+
+ logging.debug("Layers: %s", str(layers))
+ logging.debug("Config: %s", str(config_hash))
+
+ update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags,
+ manifest_hash, image)
+
+ all_layers_exist = True
+
+ if not does_hdfs_entry_exist(hdfs_manifest_dir + "/" + manifest_hash):
+ all_layers_exist = False
+
+ if not does_hdfs_entry_exist(hdfs_config_dir + "/" + config_hash):
+ all_layers_exist = False
+
+ for layer in layers:
+ hdfs_squash_path = hdfs_layers_dir + "/" + layer + ".sqsh"
+ if not does_hdfs_entry_exist(hdfs_squash_path):
+ all_layers_exist = False
+ break
+
+ if all_layers_exist:
+ if not force:
+ logging.info("All layers exist in HDFS, skipping this image")
+ continue
+ logging.info("All layers exist in HDFS, but force option set, so overwriting image")
+
+ skopeo_dir = os.path.join(working_dir, image.split("/")[-1])
+ logging.debug("skopeo_dir: %s", skopeo_dir)
+
+ skopeo_copy_image(pull_format, image, skopeo_format, skopeo_dir)
+
+ if check_magic_file:
+ check_image_for_magic_file(magic_file, skopeo_dir, layers)
+
+ for layer in layers:
+ logging.info("Squashifying and uploading layer: %s", layer)
+ hdfs_squash_path = hdfs_layers_dir + "/" + layer + ".sqsh"
+ if does_hdfs_entry_exist(hdfs_squash_path):
+ if force:
+ logging.info("Layer already exists, but overwriting due to force"
+ + "option: %s", layer)
+ else:
+ logging.info("Layer exists. Skipping and not squashifying or"
+ + "uploading: %s", layer)
+ continue
+
+ docker_to_squash(skopeo_dir, layer, working_dir)
+ squash_path = os.path.join(skopeo_dir, layer + ".sqsh")
+ squash_name = os.path.basename(squash_path)
+ upload_to_hdfs(squash_path, squash_name, hdfs_layers_dir, replication, "444", force)
+
+
+ config_local_path = os.path.join(skopeo_dir, config_hash)
+ upload_to_hdfs(config_local_path,
+ os.path.basename(config_local_path),
+ hdfs_config_dir, replication, "444", force)
+
+ manifest_local_path = os.path.join(skopeo_dir, "manifest.json")
+ upload_to_hdfs(manifest_local_path, manifest_hash,
+ hdfs_manifest_dir, replication, "444", force)
+
+ write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags)
+ upload_image_tag_to_hash_to_hdfs(local_image_tag_to_hash, image_tag_to_hash,
+ hdfs_root, replication,
+ image_tag_to_hash_hash)
+ finally:
+ if working_dir:
+ if os.path.isdir(working_dir):
+ shell_command(["sudo", "rm", "-rf", working_dir],
+ False, True, True)
+
+def pull_build(args):
+ skopeo_format = args.skopeo_format
+ pull_format = args.pull_format
+ images_and_tags = args.images_and_tags
+ check_magic_file = args.check_magic_file
+ magic_file = args.magic_file
+
+ for image_and_tag_arg in images_and_tags:
+ image, tags = split_image_and_tag(image_and_tag_arg)
+ if not image or not tags:
+ raise Exception("Positional parameter requires an image and at least 1 tag: "
+ + image_and_tag_arg)
+
+ logging.info("Working on image %s with tags %s", image, str(tags))
+ manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image)
+
+ layers = get_layer_hashes_from_manifest(manifest)
+
+ config_hash = get_config_hash_from_manifest(manifest)
+
+ logging.debug("Layers: %s", str(layers))
+ logging.debug("Config: %s", str(config_hash))
+
+
+ try:
+ working_dir = get_working_dir(args.working_dir)
+ skopeo_dir = os.path.join(working_dir, image.split("/")[-1])
+ logging.debug("skopeo_dir: %s", skopeo_dir)
+ skopeo_copy_image(pull_format, image, skopeo_format, skopeo_dir)
+
+ if check_magic_file:
+ check_image_for_magic_file(magic_file, skopeo_dir, layers)
+
+ for layer in layers:
+ logging.info("Squashifying layer: %s", layer)
+ docker_to_squash(skopeo_dir, layer, working_dir)
+
+ except:
+ if os.path.isdir(skopeo_dir):
+ shutil.rmtree(skopeo_dir)
+ raise
+
+def push_update(args):
+ hdfs_root = args.hdfs_root
+ image_tag_to_hash = args.image_tag_to_hash
+ replication = args.replication
+ force = args.force
+ images_and_tags = args.images_and_tags
+
+ hdfs_layers_dir = hdfs_root + "/layers"
+ hdfs_config_dir = hdfs_root + "/config"
+ hdfs_manifest_dir = hdfs_root + "/manifests"
+ local_image_tag_to_hash = None
+
+ try:
+ working_dir = get_working_dir(args.working_dir)
+ local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
+ setup_hdfs_dir(hdfs_root)
+ setup_squashfs_hdfs_dirs(hdfs_layers_dir, hdfs_config_dir, hdfs_manifest_dir)
+ hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root,
+ image_tag_to_hash,
+ local_image_tag_to_hash)
+
+ for image_and_tag_arg in images_and_tags:
+ image, tags = split_image_and_tag(image_and_tag_arg)
+ if not image or not tags:
+ raise Exception("Positional parameter requires an image and at least 1 tag: "
+ + image_and_tag_arg)
+
+ logging.info("Working on image %s with tags %s", image, str(tags))
+ skopeo_dir = os.path.join(working_dir, image.split("/")[-1])
+ if not os.path.exists(skopeo_dir):
+ raise Exception("skopeo_dir doesn't exists: %s" % (skopeo_dir))
+ manifest, manifest_hash = get_local_manifest_from_path(skopeo_dir + "/manifest.json")
+
+ layers = get_layer_hashes_from_manifest(manifest)
+
+ config_hash = get_config_hash_from_manifest(manifest)
+
+ logging.debug("Layers: %s", str(layers))
+ logging.debug("Config: %s", str(config_hash))
+
+ update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags,
+ manifest_hash, image)
+
+ all_layers_exist = True
+
+ if not does_hdfs_entry_exist(hdfs_manifest_dir + "/" + manifest_hash):
+ all_layers_exist = False
+
+ if not does_hdfs_entry_exist(hdfs_config_dir + "/" + config_hash):
+ all_layers_exist = False
+
+ for layer in layers:
+ hdfs_squash_path = hdfs_layers_dir + "/" + layer + ".sqsh"
+ if not does_hdfs_entry_exist(hdfs_squash_path):
+ all_layers_exist = False
+ break
+
+ if all_layers_exist:
+ if not force:
+ logging.info("All layers exist in HDFS, skipping this image")
+ continue
+ logging.info("All layers exist in HDFS, but force option set, so overwriting image")
+
+ for layer in layers:
+ hdfs_squash_path = hdfs_layers_dir + "/" + layer + ".sqsh"
+ if does_hdfs_entry_exist(hdfs_squash_path):
+ if force:
+ logging.info("Layer already exists, but overwriting due to force"
+ + "option: %s", layer)
+ else:
+ logging.info("Layer exists. Skipping and not squashifying or"
+ + "uploading: %s", layer)
+ continue
+
+ squash_path = os.path.join(skopeo_dir, layer + ".sqsh")
+ squash_name = os.path.basename(squash_path)
+ upload_to_hdfs(squash_path, squash_name, hdfs_layers_dir, replication, "444", force)
+
+
+ config_local_path = os.path.join(skopeo_dir, config_hash)
+ upload_to_hdfs(config_local_path,
+ os.path.basename(config_local_path),
+ hdfs_config_dir, replication, "444", force)
+
+ manifest_local_path = os.path.join(skopeo_dir, "manifest.json")
+ upload_to_hdfs(manifest_local_path, manifest_hash,
+ hdfs_manifest_dir, replication, "444", force)
+
+ write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags)
+ upload_image_tag_to_hash_to_hdfs(local_image_tag_to_hash, image_tag_to_hash,
+ hdfs_root, replication,
+ image_tag_to_hash_hash)
+ finally:
+ if local_image_tag_to_hash:
+ if os.path.isfile(local_image_tag_to_hash):
+ os.remove(local_image_tag_to_hash)
+
+
+def remove_image(args):
+ hdfs_root = args.hdfs_root
+ image_tag_to_hash = args.image_tag_to_hash
+ replication = args.replication
+ images_or_tags = args.images_or_tags
+ working_dir = None
+
+ try:
+ working_dir = get_working_dir(args.working_dir)
+ local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
+ hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root,
+ image_tag_to_hash,
+ local_image_tag_to_hash)
+
+ logging.debug("hash_to_tags: %s", str(hash_to_tags))
+ logging.debug("tag_to_hash: %s", str(tag_to_hash))
+
+ hdfs_layers_dir = hdfs_root + "/layers"
+ hdfs_config_dir = hdfs_root + "/config"
+ hdfs_manifest_dir = hdfs_root + "/manifests"
+
+ delete_list = []
+
+ known_images, err, returncode = hdfs_ls(hdfs_manifest_dir, "-C", False, False, False)
+ known_images = known_images.split()
+
+ logging.debug("known_images:\n%s", known_images)
+
+ layers_to_keep = []
+
+ images_and_tags_to_remove = []
+ images_to_remove = []
+ for image_or_tag_arg in images_or_tags:
+ images_and_tags_to_remove.extend(image_or_tag_arg.split(","))
+
+ logging.debug("images_and_tags_to_remove:\n%s", images_and_tags_to_remove)
+
+ if isinstance(images_and_tags_to_remove, Iterable):
+ for image in images_and_tags_to_remove:
+ if is_sha256_hash(image):
+ image_hash = image
+ else:
+ image_hash = tag_to_hash.get(image, None)
+ if image_hash:
+ images_to_remove.append(hdfs_manifest_dir + "/" + image_hash)
+ else:
+ image = images_and_tags_to_remove[0]
+ if is_sha256_hash(image):
+ image_hash = image
+ else:
+ image_hash = tag_to_hash.get(image, None)
+ if image_hash:
+ images_to_remove.append(hdfs_manifest_dir + "/" + image_hash)
+
+ logging.debug("images_to_remove:\n%s", images_to_remove)
+ if not images_to_remove:
+ logging.warn("No images to remove")
+ return
+
+ for image in known_images:
+ if image not in images_to_remove:
+ manifest, manifest_hash = get_hdfs_manifest_from_path(image)
+ layers = get_layer_hashes_from_manifest(manifest)
+ layers_to_keep.extend(layers)
+
+ logging.debug("layers_to_keep:\n%s", layers_to_keep)
+
+ for image_or_tag_arg in images_or_tags:
+ images = image_or_tag_arg.split(",")
+ for image in images:
+ logging.info("removing image: %s", image)
+ if is_sha256_hash(image):
+ logging.debug("image is sha256")
+ image_hash = image
+ else:
+ image_hash = tag_to_hash.get(image, None)
+ if image_hash:
+ logging.debug("image tag exists for %s", image)
+ else:
+ logging.info("Not removing %s. Image tag doesn't exist", image)
+ continue
+ manifest_path = hdfs_manifest_dir + "/" + image_hash
+ if does_hdfs_entry_exist(manifest_path):
+ logging.debug("image manifest for %s exists: %s", image, manifest_path)
+ else:
+ logging.info("Not removing %s. Image manifest doesn't exist: %s", image, manifest_path)
+ continue
+
+ delete_list.append(manifest_path)
+
+ manifest, manifest_hash = get_hdfs_manifest_from_path(manifest_path)
+
+ config_hash = get_config_hash_from_manifest(manifest)
+ logging.debug("config_hash: %s", config_hash)
+
+ delete_list.append(hdfs_config_dir + "/" + config_hash)
+
+ layers = get_layer_hashes_from_manifest(manifest)
+ layers_paths = []
+ for layer in layers:
+ if layer not in layers_to_keep:
+ layers_paths.append(hdfs_layers_dir + "/" + layer + ".sqsh")
+ delete_list.extend(layers_paths)
+
+ logging.debug("delete_list: %s", delete_list)
+
+ remove_image_hash_from_dicts(hash_to_tags, tag_to_hash, image_hash)
+
+ write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags)
+ upload_image_tag_to_hash_to_hdfs(local_image_tag_to_hash, image_tag_to_hash,
+ hdfs_root, replication,
+ image_tag_to_hash_hash)
+
+ hdfs_rm(delete_list)
+
+ finally:
+ if working_dir:
+ if os.path.isdir(working_dir):
+ shutil.rmtree(working_dir)
+
+def add_remove_tag(args):
+ pull_format = args.pull_format
+ hdfs_root = args.hdfs_root
+ image_tag_to_hash = args.image_tag_to_hash
+ replication = args.replication
+ sub_command = args.sub_command
+ images_and_tags = args.images_and_tags
+
+ hdfs_manifest_dir = hdfs_root + "/manifests"
+ working_dir = None
+
+ try:
+ working_dir = get_working_dir(args.working_dir)
+ local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
+ hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root,
+ image_tag_to_hash,
+ local_image_tag_to_hash)
+
+ for image_and_tag_arg in images_and_tags:
+ if sub_command == "add-tag":
+ image, tags = split_image_and_tag(image_and_tag_arg)
+ if is_sha256_hash(image):
+ manifest_hash = image
+ else:
+ manifest_hash = tag_to_hash.get(image, None)
+
+ if manifest_hash:
+ manifest_path = hdfs_manifest_dir + "/" + manifest_hash
+ out, err, returncode = hdfs_cat(manifest_path)
+ manifest = json.loads(out)
+ logging.debug("image tag exists for %s", image)
+ else:
+ manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image)
+
+ update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags,
+ manifest_hash, image)
+
+ elif sub_command == "remove-tag":
+ tags = image_and_tag_arg.split(",")
+ image = None
+ manifest = None
+ manifest_hash = 0
+ remove_from_dicts(hash_to_tags, tag_to_hash, tags)
+ else:
+ raise Exception("Invalid sub_command: %s" % (sub_command))
+
+ write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags)
+ upload_image_tag_to_hash_to_hdfs(local_image_tag_to_hash, image_tag_to_hash,
+ hdfs_root, replication,
+ image_tag_to_hash_hash)
+ finally:
+ if working_dir:
+ if os.path.isdir(working_dir):
+ shutil.rmtree(working_dir)
+
+def copy_update(args):
+ image_tag_to_hash = args.image_tag_to_hash
+ replication = args.replication
+ force = args.force
+ src_root = args.src_root
+ dest_root = args.dest_root
+ images_and_tags = args.images_and_tags
+
+ src_layers_dir = src_root + "/layers"
+ src_config_dir = src_root + "/config"
+ src_manifest_dir = src_root + "/manifests"
+ dest_layers_dir = dest_root + "/layers"
+ dest_config_dir = dest_root + "/config"
+ dest_manifest_dir = dest_root + "/manifests"
+
+ setup_hdfs_dir(dest_root)
+ setup_squashfs_hdfs_dirs(dest_layers_dir, dest_config_dir, dest_manifest_dir)
+ working_dir = None
+
+ try:
+ working_dir = get_working_dir(args.working_dir)
+ local_src_image_tag_to_hash = os.path.join(working_dir, "src-"
+ + os.path.basename(image_tag_to_hash))
+ local_dest_image_tag_to_hash = os.path.join(working_dir, "dest-"
+ + os.path.basename(image_tag_to_hash))
+
+ src_hash_to_tags, src_tag_to_hash, src_image_tag_to_hash_hash = populate_tag_dicts(src_root, image_tag_to_hash, local_src_image_tag_to_hash)
+ dest_hash_to_tags, dest_tag_to_hash, dest_image_tag_to_hash_hash = populate_tag_dicts(dest_root, image_tag_to_hash, local_dest_image_tag_to_hash)
+
+ for image_and_tag_arg in images_and_tags:
+ image, tags = split_image_and_tag(image_and_tag_arg)
+ if not image:
+ raise Exception("Positional parameter requires an image: " + image_and_tag_arg)
+ if not tags:
+ logging.debug("Tag not given. Using image tag instead: %s", image)
+ tags = [image]
+
+ src_manifest_hash = src_tag_to_hash.get(image, None)
+ if not src_manifest_hash:
+ logging.info("Manifest not found for image %s. Skipping", image)
+ continue
+
+ src_manifest_path = src_manifest_dir + "/" + src_manifest_hash
+ dest_manifest_path = dest_manifest_dir + "/" + src_manifest_hash
+ src_manifest, src_manifest_hash = get_hdfs_manifest_from_path(src_manifest_path)
+
+ src_config_hash = get_config_hash_from_manifest(src_manifest)
+ src_config_path = src_config_dir + "/" + src_config_hash
+ dest_config_path = dest_config_dir + "/" + src_config_hash
+
+ src_layers = get_layer_hashes_from_manifest(src_manifest)
+ src_layers_paths = [src_layers_dir + "/" + layer + ".sqsh" for layer in src_layers]
+ dest_layers_paths = [dest_layers_dir + "/" + layer + ".sqsh" for layer in src_layers]
+
+ logging.debug("Copying Manifest: %s", str(src_manifest_path))
+ logging.debug("Copying Layers: %s", str(src_layers_paths))
+ logging.debug("Copying Config: %s", str(src_config_hash))
+
+ hdfs_cp(src_layers_paths, dest_layers_dir, force)
+ hdfs_cp(src_config_path, dest_config_dir, force)
+ hdfs_cp(src_manifest_path, dest_manifest_dir, force)
+
+ hdfs_setrep(replication, dest_layers_paths)
+ hdfs_setrep(replication, dest_config_path)
+ hdfs_setrep(replication, dest_manifest_path)
+
+ hdfs_chmod("444", dest_layers_paths)
+ hdfs_chmod("444", dest_config_path)
+ hdfs_chmod("444", dest_manifest_path)
+
+ for tag in tags:
+ new_tags_and_comments = src_hash_to_tags.get(src_manifest_hash, None)
+ if new_tags_and_comments:
+ comment = ', '.join(map(str, new_tags_and_comments[1]))
+ if comment is None:
+ comment = image
+
+ update_dicts(dest_hash_to_tags, dest_tag_to_hash, tag, src_manifest_hash, comment)
+
+ write_local_image_tag_to_hash(local_dest_image_tag_to_hash, dest_hash_to_tags)
+ upload_image_tag_to_hash_to_hdfs(local_dest_image_tag_to_hash, image_tag_to_hash,
+ dest_root, replication,
+ dest_image_tag_to_hash_hash)
+
+ finally:
+ if working_dir:
+ if os.path.isdir(working_dir):
+ shutil.rmtree(working_dir)
+
+def query_tag(args):
+ hdfs_root = args.hdfs_root
+ image_tag_to_hash = args.image_tag_to_hash
+ tags = args.tags
+ working_dir = None
+
+ try:
+ working_dir = get_working_dir(args.working_dir)
+ local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
+ hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root,
+ image_tag_to_hash,
+ local_image_tag_to_hash)
+
+ logging.debug("hash_to_tags: %s", str(hash_to_tags))
+ logging.debug("tag_to_hash: %s", str(tag_to_hash))
+
+ hdfs_layers_dir = hdfs_root + "/layers"
+ hdfs_config_dir = hdfs_root + "/config"
+ hdfs_manifest_dir = hdfs_root + "/manifests"
+
+ for tag in tags:
+ image_hash = tag_to_hash.get(tag, None)
+ if not image_hash:
+ logging.info("image hash mapping doesn't exist for tag %s", tag)
+ continue
+
+ manifest_path = hdfs_manifest_dir + "/" + image_hash
+ if does_hdfs_entry_exist(manifest_path):
+ logging.debug("image manifest for %s exists: %s", tag, manifest_path)
+ else:
+ logging.info("Image manifest for %s doesn't exist: %s", tag, manifest_path)
+ continue
+
+ manifest, manifest_hash = get_hdfs_manifest_from_path(manifest_path)
+ layers = get_layer_hashes_from_manifest(manifest)
+ config_hash = get_config_hash_from_manifest(manifest)
+ config_path = hdfs_config_dir + "/" + config_hash
+
+ layers_paths = [hdfs_layers_dir + "/" + layer + ".sqsh" for layer in layers]
+
+ logging.info("Image info for '%s'", tag)
+ logging.info(manifest_path)
+ logging.info(config_path)
+ for layer in layers_paths:
+ logging.info(layer)
+
+ finally:
+ if working_dir:
+ if os.path.isdir(working_dir):
+ shutil.rmtree(working_dir)
+
+def list_tags(args):
+ hdfs_root = args.hdfs_root
+ image_tag_to_hash = args.image_tag_to_hash
+
+ hdfs_image_tag_to_hash = hdfs_root + "/" + image_tag_to_hash
+ if does_hdfs_entry_exist(hdfs_image_tag_to_hash):
+ hdfs_cat(hdfs_image_tag_to_hash, True, True, False)
+ else:
+ logging.error("image-tag-to-hash file doesn't exist: %s", hdfs_image_tag_to_hash)
+
+def create_parsers():
+ parser = argparse.ArgumentParser()
+ add_parser_default_arguments(parser)
+
+ subparsers = parser.add_subparsers(help='sub help', dest='sub_command')
+
+ parse_pull_build_push_update = subparsers.add_parser('pull-build-push-update',
+ help='Pull an image, build its squashfs'
+ + 'layers, push it to hdfs, and'
+ + 'atomically update the'
+ + 'image-tag-to-hash file')
+ parse_pull_build_push_update.set_defaults(func=pull_build_push_update)
+ add_parser_default_arguments(parse_pull_build_push_update)
+ parse_pull_build_push_update.add_argument("images_and_tags", nargs="+",
+ help="Image and tag argument (can specify multiple)")
+
+ parse_pull_build = subparsers.add_parser('pull-build',
+ help='Pull an image and build its squashfs layers')
+ parse_pull_build .set_defaults(func=pull_build)
+ add_parser_default_arguments(parse_pull_build)
+ parse_pull_build.add_argument("images_and_tags", nargs="+",
+ help="Image and tag argument (can specify multiple)")
+
+ parse_push_update = subparsers.add_parser('push-update',
+ help='Push the squashfs layers to hdfs and update'
+ + 'the image-tag-to-hash file')
+ parse_push_update.set_defaults(func=push_update)
+ add_parser_default_arguments(parse_push_update)
+ parse_push_update.add_argument("images_and_tags", nargs="+",
+ help="Image and tag argument (can specify multiple)")
+
+ parse_remove_image = subparsers.add_parser('remove-image',
+ help='Remove an image (manifest, config, layers)'
+ + 'from hdfs based on its tag or manifest hash')
+ parse_remove_image.set_defaults(func=remove_image)
+ add_parser_default_arguments(parse_remove_image)
+ parse_remove_image.add_argument("images_or_tags", nargs="+",
+ help="Image or tag argument (can specify multiple)")
+
+ parse_remove_tag = subparsers.add_parser('remove-tag',
+ help='Remove an image to tag mapping in the'
+ + 'image-tag-to-hash file')
+ parse_remove_tag.set_defaults(func=add_remove_tag)
+ add_parser_default_arguments(parse_remove_tag)
+ parse_remove_tag.add_argument("images_and_tags", nargs="+",
+ help="Image and tag argument (can specify multiple)")
+
+ parse_add_tag = subparsers.add_parser('add-tag',
+ help='Add an image to tag mapping in the'
+ + 'image-tag-to-hash file')
+ parse_add_tag.set_defaults(func=add_remove_tag)
+ add_parser_default_arguments(parse_add_tag)
+ parse_add_tag.add_argument("images_and_tags", nargs="+",
+ help="Image and tag argument (can specify multiple)")
+
+ parse_copy_update = subparsers.add_parser('copy-update',
+ help='Copy an image from hdfs in one cluster to'
+ + 'another and then update the'
+ + 'image-tag-to-hash file')
+ parse_copy_update.set_defaults(func=copy_update)
+ add_parser_default_arguments(parse_copy_update)
+ parse_copy_update.add_argument("src_root",
+ help="HDFS path for source root directory")
+ parse_copy_update.add_argument("dest_root",
+ help="HDFS path for destination root directory")
+ parse_copy_update.add_argument("images_and_tags", nargs="+",
+ help="Image and tag argument (can specify multiple)")
+
+ parse_query_tag = subparsers.add_parser('query-tag',
+ help='Get the manifest, config, and layers'
+ + 'associated with a tag')
+ parse_query_tag.set_defaults(func=query_tag)
+ add_parser_default_arguments(parse_query_tag)
+ parse_query_tag.add_argument("tags", nargs="+",
+ help="Image or tag argument (can specify multiple)")
+
+ parse_list_tags = subparsers.add_parser('list-tags',
+ help='List all tags in image-tag-to-hash file')
+ parse_list_tags.set_defaults(func=list_tags)
+ add_parser_default_arguments(parse_list_tags)
+
+ return parser
+
+def add_parser_default_arguments(parser):
+ parser.add_argument("--working-dir", type=str, dest='working_dir', default="dts-work-dir",
+ help="Name of working directory")
+ parser.add_argument("--skopeo-format", type=str, dest='skopeo_format',
+ default='dir', help="Output format for skopeo copy")
+ parser.add_argument("--pull-format", type=str, dest='pull_format',
+ default='docker', help="Pull format for skopeo")
+ parser.add_argument("-l", "--log", type=str, dest='LOG_LEVEL',
+ default="INFO", help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)")
+ parser.add_argument("--hdfs-root", type=str, dest='hdfs_root',
+ default='/runc-root', help="The root directory in HDFS for all of the"
+ + "squashfs images")
+ parser.add_argument("--image-tag-to-hash", type=str,
+ dest='image_tag_to_hash', default='image-tag-to-hash',
+ help="image-tag-to-hash filename in hdfs")
+ parser.add_argument("-r", "--replication", type=int, dest='replication',
+ default=1, help="Replication factor for all files uploaded to HDFS")
+ parser.add_argument("--hadoop-prefix", type=str, dest='hadoop_prefix',
+ default=os.environ.get('HADOOP_PREFIX'),
+ help="hadoop prefix value for environment")
+ parser.add_argument("-f", "--force", dest='force',
+ action="store_true", default=False, help="Force overwrites in HDFS")
+ parser.add_argument("--check-magic-file", dest='check_magic_file',
+ action="store_true", default=False, help="Check for a specific magic file"
+ + "in the image before uploading")
+ parser.add_argument("--magic-file", type=str, dest='magic_file',
+ default='etc/dockerfile-version', help="The magic file to check for"
+ + "in the image")
+ return parser
+
+def check_dependencies():
+ try:
+ command = [HADOOP_BIN_DIR + "/hadoop", "version"]
+ shell_command(command, False, False, True)
+ except:
+ logging.error("Could not find hadoop. Make sure HADOOP_PREFIX " +
+ "is set correctly either in your environment or on the command line via " +
+ "via --hadoop-prefix")
+ return 1
+
+ try:
+ command = ["skopeo", "-v"]
+ shell_command(command, False, False, True)
+ except:
+ logging.error("Could not find skopeo. Make sure it is installed and present " +
+ "on the PATH")
+ return 1
+
+ try:
+ command = ["mksquashfs", "-version"]
+ shell_command(command, False, False, True)
+ except:
+ logging.error("Could not find mksquashfs. Make sure squashfs-tools is installed " +
+ "and mksquashfs is present on the the PATH")
+ return 1
+
+ try:
+ command = ["tar", "--version"]
+ shell_command(command, False, False, True)
+ except:
+ logging.error("Could not find tar. Make sure it is installed and present " +
+ "on the PATH")
+ return 1
+
+ try:
+ command = ["setfattr", "--version"]
+ shell_command(command, False, False, True)
+ except:
+ logging.error("Could not find setfattr . Make sure it is installed and present " +
+ "on the PATH")
+ return 1
+
+
+
+ return 0
+
+def main():
+ global LOG_LEVEL
+ global HADOOP_BIN_DIR
+ parser = create_parsers()
+ args, extra = parser.parse_known_args()
+
+ if extra:
+ raise Exception("Extra unknown arguments given: %s" % (extra))
+
+ LOG_LEVEL = args.LOG_LEVEL.upper()
+ image_tag_to_hash = args.image_tag_to_hash
+
+ numeric_level = getattr(logging, LOG_LEVEL, None)
+ if not isinstance(numeric_level, int):
+ logging.error("Invalid log level: %s", LOG_LEVEL)
+ return
+ logging.basicConfig(format="%(levelname)s: %(message)s", level=numeric_level)
+
+ if args.hadoop_prefix is None:
+ logging.error("Hadoop prefix is not set. You may set it either " +
+ "in your environment or via --hadoop-prefix")
+ return
+
+ HADOOP_BIN_DIR = args.hadoop_prefix + "/bin"
+
+ if check_dependencies():
+ return
+
+ if "/" in image_tag_to_hash:
+ logging.error("image-tag-to-hash cannot contain a /")
+ return
+
+ logging.debug("args: %s", str(args))
+ logging.debug("extra: %s", str(extra))
+ logging.debug("image-tag-to-hash: %s", image_tag_to_hash)
+ logging.debug("LOG_LEVEL: %s", LOG_LEVEL)
+ logging.debug("HADOOP_BIN_DIR: %s", str(HADOOP_BIN_DIR))
+
+ args.func(args)
+
+if __name__ == "__main__":
+ main()