diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml index 4da4ac5acb9..6187c75114a 100644 --- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml +++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml @@ -28,6 +28,14 @@ bin 0755 + + hadoop-yarn/bin + sbin + 0755 + + docker_to_squash.py + + hadoop-yarn/bin bin diff --git a/hadoop-yarn-project/hadoop-yarn/bin/docker_to_squash.py b/hadoop-yarn-project/hadoop-yarn/bin/docker_to_squash.py new file mode 100755 index 00000000000..bb33524a239 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/bin/docker_to_squash.py @@ -0,0 +1,1283 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +docker_to_squash.py is a tool to facilitate the process of converting +Docker images into squashFS layers, manifests, and configs. + +Tool dependencies: skopeo, squashfs-tools, tar +""" + +import argparse +from collections import Iterable +import hashlib +import json +import logging +import os +import re +import shutil +import subprocess +import sys + +LOG_LEVEL = None +HADOOP_BIN_DIR = None + +def shell_command(command, print_stdout, print_stderr, raise_on_error): + global LOG_LEVEL + stdout_val = subprocess.PIPE + stderr_val = subprocess.PIPE + + logging.debug("command: %s", command) + + if print_stdout: + stdout_val = None + + if print_stderr or LOG_LEVEL == "DEBUG": + stderr_val = None + + process = None + try: + process = subprocess.Popen(command, stdout=stdout_val, + stderr=stderr_val) + out, err = process.communicate() + except: + if process and process.poll() is None: + process.kill() + raise Exception("Popen failure") + + if raise_on_error and process.returncode is not 0: + raise Exception("Commmand: " + str(command) + + " failed with returncode: " + + str(process.returncode) + "\nstdout: " + + str(out) + "\nstderr: " + str(err)) + + return out, err, process.returncode + +def does_hdfs_entry_exist(entry): + out, err, returncode = hdfs_ls(entry) + if returncode is not 0: + return False + return True + +def setup_hdfs_dir(dir_entry): + if does_hdfs_entry_exist(dir_entry): + hdfs_chmod("755", dir_entry) + return + + out, err, returncode = hdfs_mkdir(dir_entry, raise_on_error=False) + if does_hdfs_entry_exist(dir_entry): + hdfs_chmod("755", dir_entry) + return + + directories = dir_entry.split("/")[1:] + dir = "" + for directory in directories: + dir = dir + "/" + directory + if not does_hdfs_entry_exist(dir): + hdfs_mkdir(dir_entry, create_parents=True) + chmod_dir = dir + hdfs_chmod("755", chmod_dir, recursive=True) + break + +def append_or_extend_to_list(src, src_list): + if isinstance(src, list): + src_list.extend(src) + else: + src_list.append(src) + +def hdfs_get(src, dest, print_stdout=False, print_stderr=False, raise_on_error=True): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-get"] + append_or_extend_to_list(src, command) + command.append(dest) + out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error) + return out, err, returncode + +def hdfs_ls(file_path, options="", print_stdout=False, print_stderr=False, raise_on_error=False): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-ls"] + if options: + append_or_extend_to_list(options, command) + append_or_extend_to_list(file_path, command) + out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error) + return out, err, returncode + +def hdfs_cat(file_path, print_stdout=False, print_stderr=True, raise_on_error=True): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-cat"] + append_or_extend_to_list(file_path, command) + out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error) + return out, err, returncode + +def hdfs_mkdir(file_path, print_stdout=False, print_stderr=True, raise_on_error=True, create_parents=False): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-mkdir"] + if create_parents: + command.append("-p") + append_or_extend_to_list(file_path, command) + out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error) + return out, err, returncode + +def hdfs_rm(file_path, print_stdout=False, print_stderr=True, raise_on_error=True): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-rm"] + append_or_extend_to_list(file_path, command) + out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error) + return out, err, returncode + +def hdfs_put(src, dest, force=False, print_stdout=False, print_stderr=True, raise_on_error=True): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-put"] + if force: + command.append("-f") + append_or_extend_to_list(src, command) + command.append(dest) + out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error) + return out, err, returncode + +def hdfs_chmod(mode, file_path, print_stdout=False, print_stderr=True, raise_on_error=True, recursive=False): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-chmod"] + if recursive: + command.append("-R") + command.append(mode) + append_or_extend_to_list(file_path, command) + out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error) + return out, err, returncode + +def hdfs_setrep(replication, file_path, print_stdout=False, print_stderr=True, raise_on_error=True): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-setrep", str(replication)] + append_or_extend_to_list(file_path, command) + out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error) + return out, err, returncode + +def hdfs_cp(src, dest, force=False, print_stdout=False, print_stderr=True, raise_on_error=False): + global HADOOP_BIN_DIR + command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-cp"] + if force: + command.append("-f") + append_or_extend_to_list(src, command) + command.append(dest) + out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error) + return out, err, returncode + +def get_working_dir(directory): + try: + if os.path.isdir(directory): + working_dir = os.path.join(directory, "docker-to-squash") + else: + working_dir = directory + os.makedirs(working_dir) + except: + raise Exception("Could not create working_dir: " + working_dir) + return working_dir + +def is_sha256_hash(string): + if not re.findall(r"^[a-fA-F\d]{64,64}$", string): + return False + return True + +def calculate_file_hash(filename): + sha = hashlib.sha256() + with open(filename, 'rb') as file_pointer: + while True: + data = file_pointer.read(65536) + if not data: + break + sha.update(data) + return sha.hexdigest() + +def calculate_string_hash(string): + sha = hashlib.sha256() + sha.update(string) + return sha.hexdigest() + +def get_local_manifest_from_path(manifest_path): + with open(manifest_path, "rb") as file_pointer: + out = file_pointer.read() + manifest_hash = calculate_string_hash(str(out)) + manifest = json.loads(out) + return manifest, manifest_hash + +def get_hdfs_manifest_from_path(manifest_path): + out, err, returncode = hdfs_cat(manifest_path) + manifest_hash = calculate_string_hash(str(out)) + manifest = json.loads(out) + return manifest, manifest_hash + +def get_config_hash_from_manifest(manifest): + config_hash = manifest['config']['digest'].split(":", 1)[1] + return config_hash + +def get_layer_hashes_from_manifest(manifest): + layers = [] + layers_dict = manifest['layers'] + for layer in layers_dict: + layers.append(layer['digest'].split(":", 1)[1]) + return layers + +def get_manifest_from_docker_image(pull_format, image): + out, err, returncode = shell_command(["skopeo", "inspect", "--raw", pull_format + "://" + image], + False, True, True) + manifest = json.loads(out) + if 'manifests' in manifest: + logging.debug("skopeo inspect --raw returned a list of manifests") + manifests_dict = manifest['manifests'] + sha = None + for mfest in manifests_dict: + if(mfest['platform']['architecture'] == "amd64"): + sha = mfest['digest'] + break + if not sha: + raise Exception("Could not find amd64 manifest for" + image) + + image_without_tag = image.split("/", 1)[-1].split(":", 1)[0] + image_and_sha = image_without_tag + "@" + sha + + logging.debug("amd64 manifest sha is: " + sha) + + manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image_and_sha) + else: + manifest_hash = calculate_string_hash(str(out)) + + logging.info("manifest: " + str(manifest)) + return manifest, manifest_hash + +def split_image_and_tag(image_and_tag): + split = image_and_tag.split(",") + image = split[0] + tags = split[1:] + return image, tags + +def read_image_tag_to_hash(image_tag_to_hash): + hash_to_tags = dict() + tag_to_hash = dict() + with open(image_tag_to_hash, 'rb') as file_pointer: + while True: + line = file_pointer.readline() + if not line: + break + line = line.rstrip() + + if not line: + continue + + comment_split_line = line.split("#", 1) + line = comment_split_line[0] + comment = comment_split_line[1:] + + split_line = line.rsplit(":", 1) + manifest_hash = split_line[-1] + tags_list = ' '.join(split_line[:-1]).split(",") + + if not is_sha256_hash(manifest_hash) or not tags_list: + logging.warn("image-tag-to-hash file malformed. Skipping entry %s", line) + continue + + tags_and_comments = hash_to_tags.get(manifest_hash, None) + if tags_and_comments is None: + known_tags = tags_list + known_comment = comment + else: + known_tags = tags_and_comments[0] + for tag in tags_list: + if tag not in known_tags: + known_tags.append(tag) + known_comment = tags_and_comments[1] + known_comment.extend(comment) + + hash_to_tags[manifest_hash] = (known_tags, known_comment) + + for tag in tags_list: + cur_manifest = tag_to_hash.get(tag, None) + if cur_manifest is not None: + logging.warn("tag_to_hash already has manifest %s defined for tag %s." + + "This entry will be overwritten", cur_manifest, tag) + tag_to_hash[tag] = manifest_hash + return hash_to_tags, tag_to_hash + +def remove_tag_from_dicts(hash_to_tags, tag_to_hash, tag): + if not hash_to_tags: + logging.debug("hash_to_tags is null. Not removing tag %s", tag) + return + + prev_hash = tag_to_hash.get(tag, None) + + if prev_hash is not None: + del tag_to_hash[tag] + prev_tags, prev_comment = hash_to_tags.get(prev_hash, (None, None)) + prev_tags.remove(tag) + if prev_tags == 0: + del hash_to_tags[prev_hash] + else: + hash_to_tags[prev_hash] = (prev_tags, prev_comment) + else: + logging.debug("Tag not found. Not removing tag: %s", tag) + +def remove_image_hash_from_dicts(hash_to_tags, tag_to_hash, image_hash): + if not hash_to_tags: + logging.debug("hash_to_tags is null. Not removing image_hash %s", image_hash) + return + logging.debug("hash_to_tags: %s", str(hash_to_tags)) + logging.debug("Removing image_hash from dicts: %s", image_hash) + prev_tags, prev_comments = hash_to_tags.get(image_hash, None) + + if prev_tags is not None: + hash_to_tags.pop(image_hash) + for tag in prev_tags: + del tag_to_hash[tag] + +def add_tag_to_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment): + tag_to_hash[tag] = manifest_hash + new_tags_and_comments = hash_to_tags.get(manifest_hash, None) + if new_tags_and_comments is None: + new_tags = [tag] + new_comment = [comment] + else: + new_tags = new_tags_and_comments[0] + new_comment = new_tags_and_comments[1] + if tag not in new_tags: + new_tags.append(tag) + if comment and comment not in new_comment: + new_comment.append(comment) + hash_to_tags[manifest_hash] = (new_tags, new_comment) + +def write_local_image_tag_to_hash(image_tag_to_hash, hash_to_tags): + with open(image_tag_to_hash, 'w') as file_pointer: + for key, value in hash_to_tags.iteritems(): + manifest_hash = key + tags = ','.join(map(str, value[0])) + if tags: + comment = ', '.join(map(str, value[1])) + if comment > 0: + comment = "#" + comment + file_pointer.write(tags + ":" + manifest_hash + comment + "\n") + else: + for comment in value[1]: + file_pointer.write("#" + comment + "\n") + +def update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags, + manifest_hash, comment): + for tag in tags: + update_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment) + +def update_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment): + remove_tag_from_dicts(hash_to_tags, tag_to_hash, tag) + add_tag_to_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment) + +def remove_from_dicts(hash_to_tags, tag_to_hash, tags): + for tag in tags: + logging.debug("removing tag: %s", tag) + remove_tag_from_dicts(hash_to_tags, tag_to_hash, tag) + +def populate_tag_dicts(hdfs_root, image_tag_to_hash, local_image_tag_to_hash): + + if does_hdfs_entry_exist(hdfs_root + "/" + image_tag_to_hash): + hdfs_get(hdfs_root + "/" + image_tag_to_hash, local_image_tag_to_hash) + image_tag_to_hash_hash = calculate_file_hash(local_image_tag_to_hash) + else: + image_tag_to_hash_hash = 0 + + if image_tag_to_hash_hash != 0: + hash_to_tags, tag_to_hash = read_image_tag_to_hash(local_image_tag_to_hash) + else: + hash_to_tags = {} + tag_to_hash = {} + return hash_to_tags, tag_to_hash, image_tag_to_hash_hash + + +def setup_squashfs_hdfs_dirs(layers, config, manifest): + dirs = [layers, config, manifest] + logging.debug("Setting up squashfs dirs: %s", str(dirs)) + for dir_entry in dirs: + setup_hdfs_dir(dir_entry) + +def skopeo_copy_image(pull_format, image, skopeo_format, skopeo_dir): + logging.info("Pulling image: %s", image) + if os.path.isdir(skopeo_dir): + raise Exception("Skopeo output directory already exists. " + + "Please delete and try again " + + "Directory: " + skopeo_dir) + shell_command(["skopeo", "copy", pull_format + "://" + image, + skopeo_format + ":" + skopeo_dir], False, True, True) + +def untar_layer(tmp_dir, layer_path): + shell_command(["sudo", "tar", "-C", tmp_dir, "--xattrs", + "--xattrs-include='*'", "-xzf", layer_path], + False, True, True) + +def tar_file_search(archive, target): + out, err, returncode = shell_command(["tar", "-xf", archive, target, "-O"], + False, False, False) + return out + +def set_fattr(directory): + shell_command(["sudo", "setfattr", "-n", "trusted.overlay.opaque", + "-v", "y", directory], False, True, True) + +def make_whiteout_block_device(file_path, whiteout): + shell_command(["sudo", "mknod", "-m", "000", file_path, + "c", "0", "0"], False, True, True) + + out, err, returncode = shell_command(["stat", "-c", "%U:%G", whiteout], False, True, True) + perms = str(out).strip() + + shell_command(["sudo", "chown", perms, file_path], False, True, True) + +def convert_oci_whiteouts(tmp_dir): + out, err, returncode = shell_command(["sudo", "find", tmp_dir, "-name", ".wh.*"], + False, False, True) + whiteouts = str(out).splitlines() + for whiteout in whiteouts: + if whiteout == 0: + continue + basename = os.path.basename(whiteout) + directory = os.path.dirname(whiteout) + if basename == ".wh..wh..opq": + set_fattr(directory) + else: + whiteout_string = ".wh." + idx = basename.rfind(whiteout_string) + bname = basename[idx+len(whiteout_string):] + file_path = os.path.join(directory, bname) + make_whiteout_block_device(file_path, whiteout) + shell_command(["sudo", "rm", whiteout], False, True, True) + +def dir_to_squashfs(tmp_dir, squash_path): + shell_command(["sudo", "mksquashfs", tmp_dir, squash_path], + False, True, True) + +def upload_to_hdfs(file_path, file_name, hdfs_dir, replication, mode, force=False): + dest = hdfs_dir + "/" + file_name + + if does_hdfs_entry_exist(dest): + if not force: + logging.warn("Not uploading to HDFS. File already exists: %s", dest) + return + logging.info("File already exists, but overwriting due to force option: %s", dest) + + hdfs_put(file_path, dest, force) + hdfs_setrep(replication, dest) + hdfs_chmod(mode, dest) + logging.info("Uploaded file %s with replication %d and permissions %s", + dest, replication, mode) + +def upload_image_tag_to_hash_to_hdfs(file_path, file_name, hdfs_dir, + replication, + image_tag_to_hash_hash): + local_hash = calculate_file_hash(file_path) + if local_hash == image_tag_to_hash_hash: + logging.info("image_tag_to_hash file unchanged. Not uploading") + return + hdfs_file_path = hdfs_dir + "/" + file_name + try: + hdfs_put(file_path, hdfs_file_path, force=True) + hdfs_setrep(replication, hdfs_file_path) + hdfs_chmod("444", hdfs_file_path) + + except: + raise Exception("image tag to hash file upload failed") + +def docker_to_squash(layer_dir, layer, working_dir): + tmp_dir = os.path.join(working_dir, "expand_archive_" + layer) + layer_path = os.path.join(layer_dir, layer) + squash_path = layer_path + ".sqsh" + + if os.path.isdir(tmp_dir): + raise Exception("tmp_dir already exists. Please delete and try again " + + "Directory: " + tmp_dir) + os.makedirs(tmp_dir) + + try: + untar_layer(tmp_dir, layer_path) + convert_oci_whiteouts(tmp_dir) + dir_to_squashfs(tmp_dir, squash_path) + finally: + os.remove(layer_path) + shell_command(["sudo", "rm", "-rf", tmp_dir], + False, True, True) + + +def check_image_for_magic_file(magic_file, skopeo_dir, layers): + logging.debug("Searching for magic file %s", magic_file) + for layer in layers: + ret = tar_file_search(os.path.join(skopeo_dir, layer), magic_file) + if ret: + logging.debug("Found magic file %s in layer %s", magic_file, layer) + logging.debug("Magic file %s has contents:\n%s", magic_file, ret) + return ret + raise Exception("Magic file %s doesn't exist in any layer" % (magic_file)) + +def pull_build_push_update(args): + skopeo_format = args.skopeo_format + pull_format = args.pull_format + hdfs_root = args.hdfs_root + image_tag_to_hash = args.image_tag_to_hash + replication = args.replication + force = args.force + images_and_tags = args.images_and_tags + check_magic_file = args.check_magic_file + magic_file = args.magic_file + + hdfs_layers_dir = hdfs_root + "/layers" + hdfs_config_dir = hdfs_root + "/config" + hdfs_manifest_dir = hdfs_root + "/manifests" + working_dir = None + + + try: + working_dir = get_working_dir(args.working_dir) + local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash)) + setup_hdfs_dir(hdfs_root) + setup_squashfs_hdfs_dirs(hdfs_layers_dir, hdfs_config_dir, hdfs_manifest_dir) + hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root, + image_tag_to_hash, + local_image_tag_to_hash) + + for image_and_tag_arg in images_and_tags: + image, tags = split_image_and_tag(image_and_tag_arg) + if not image or not tags: + raise Exception("Positional parameter requires an image and at least 1 tag: " + + image_and_tag_arg) + + logging.info("Working on image %s with tags %s", image, str(tags)) + manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image) + + layers = get_layer_hashes_from_manifest(manifest) + + config_hash = get_config_hash_from_manifest(manifest) + + logging.debug("Layers: %s", str(layers)) + logging.debug("Config: %s", str(config_hash)) + + update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags, + manifest_hash, image) + + all_layers_exist = True + + if not does_hdfs_entry_exist(hdfs_manifest_dir + "/" + manifest_hash): + all_layers_exist = False + + if not does_hdfs_entry_exist(hdfs_config_dir + "/" + config_hash): + all_layers_exist = False + + for layer in layers: + hdfs_squash_path = hdfs_layers_dir + "/" + layer + ".sqsh" + if not does_hdfs_entry_exist(hdfs_squash_path): + all_layers_exist = False + break + + if all_layers_exist: + if not force: + logging.info("All layers exist in HDFS, skipping this image") + continue + logging.info("All layers exist in HDFS, but force option set, so overwriting image") + + skopeo_dir = os.path.join(working_dir, image.split("/")[-1]) + logging.debug("skopeo_dir: %s", skopeo_dir) + + skopeo_copy_image(pull_format, image, skopeo_format, skopeo_dir) + + if check_magic_file: + check_image_for_magic_file(magic_file, skopeo_dir, layers) + + for layer in layers: + logging.info("Squashifying and uploading layer: %s", layer) + hdfs_squash_path = hdfs_layers_dir + "/" + layer + ".sqsh" + if does_hdfs_entry_exist(hdfs_squash_path): + if force: + logging.info("Layer already exists, but overwriting due to force" + + "option: %s", layer) + else: + logging.info("Layer exists. Skipping and not squashifying or" + + "uploading: %s", layer) + continue + + docker_to_squash(skopeo_dir, layer, working_dir) + squash_path = os.path.join(skopeo_dir, layer + ".sqsh") + squash_name = os.path.basename(squash_path) + upload_to_hdfs(squash_path, squash_name, hdfs_layers_dir, replication, "444", force) + + + config_local_path = os.path.join(skopeo_dir, config_hash) + upload_to_hdfs(config_local_path, + os.path.basename(config_local_path), + hdfs_config_dir, replication, "444", force) + + manifest_local_path = os.path.join(skopeo_dir, "manifest.json") + upload_to_hdfs(manifest_local_path, manifest_hash, + hdfs_manifest_dir, replication, "444", force) + + write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags) + upload_image_tag_to_hash_to_hdfs(local_image_tag_to_hash, image_tag_to_hash, + hdfs_root, replication, + image_tag_to_hash_hash) + finally: + if working_dir: + if os.path.isdir(working_dir): + shell_command(["sudo", "rm", "-rf", working_dir], + False, True, True) + +def pull_build(args): + skopeo_format = args.skopeo_format + pull_format = args.pull_format + images_and_tags = args.images_and_tags + check_magic_file = args.check_magic_file + magic_file = args.magic_file + + for image_and_tag_arg in images_and_tags: + image, tags = split_image_and_tag(image_and_tag_arg) + if not image or not tags: + raise Exception("Positional parameter requires an image and at least 1 tag: " + + image_and_tag_arg) + + logging.info("Working on image %s with tags %s", image, str(tags)) + manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image) + + layers = get_layer_hashes_from_manifest(manifest) + + config_hash = get_config_hash_from_manifest(manifest) + + logging.debug("Layers: %s", str(layers)) + logging.debug("Config: %s", str(config_hash)) + + + try: + working_dir = get_working_dir(args.working_dir) + skopeo_dir = os.path.join(working_dir, image.split("/")[-1]) + logging.debug("skopeo_dir: %s", skopeo_dir) + skopeo_copy_image(pull_format, image, skopeo_format, skopeo_dir) + + if check_magic_file: + check_image_for_magic_file(magic_file, skopeo_dir, layers) + + for layer in layers: + logging.info("Squashifying layer: %s", layer) + docker_to_squash(skopeo_dir, layer, working_dir) + + except: + if os.path.isdir(skopeo_dir): + shutil.rmtree(skopeo_dir) + raise + +def push_update(args): + hdfs_root = args.hdfs_root + image_tag_to_hash = args.image_tag_to_hash + replication = args.replication + force = args.force + images_and_tags = args.images_and_tags + + hdfs_layers_dir = hdfs_root + "/layers" + hdfs_config_dir = hdfs_root + "/config" + hdfs_manifest_dir = hdfs_root + "/manifests" + local_image_tag_to_hash = None + + try: + working_dir = get_working_dir(args.working_dir) + local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash)) + setup_hdfs_dir(hdfs_root) + setup_squashfs_hdfs_dirs(hdfs_layers_dir, hdfs_config_dir, hdfs_manifest_dir) + hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root, + image_tag_to_hash, + local_image_tag_to_hash) + + for image_and_tag_arg in images_and_tags: + image, tags = split_image_and_tag(image_and_tag_arg) + if not image or not tags: + raise Exception("Positional parameter requires an image and at least 1 tag: " + + image_and_tag_arg) + + logging.info("Working on image %s with tags %s", image, str(tags)) + skopeo_dir = os.path.join(working_dir, image.split("/")[-1]) + if not os.path.exists(skopeo_dir): + raise Exception("skopeo_dir doesn't exists: %s" % (skopeo_dir)) + manifest, manifest_hash = get_local_manifest_from_path(skopeo_dir + "/manifest.json") + + layers = get_layer_hashes_from_manifest(manifest) + + config_hash = get_config_hash_from_manifest(manifest) + + logging.debug("Layers: %s", str(layers)) + logging.debug("Config: %s", str(config_hash)) + + update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags, + manifest_hash, image) + + all_layers_exist = True + + if not does_hdfs_entry_exist(hdfs_manifest_dir + "/" + manifest_hash): + all_layers_exist = False + + if not does_hdfs_entry_exist(hdfs_config_dir + "/" + config_hash): + all_layers_exist = False + + for layer in layers: + hdfs_squash_path = hdfs_layers_dir + "/" + layer + ".sqsh" + if not does_hdfs_entry_exist(hdfs_squash_path): + all_layers_exist = False + break + + if all_layers_exist: + if not force: + logging.info("All layers exist in HDFS, skipping this image") + continue + logging.info("All layers exist in HDFS, but force option set, so overwriting image") + + for layer in layers: + hdfs_squash_path = hdfs_layers_dir + "/" + layer + ".sqsh" + if does_hdfs_entry_exist(hdfs_squash_path): + if force: + logging.info("Layer already exists, but overwriting due to force" + + "option: %s", layer) + else: + logging.info("Layer exists. Skipping and not squashifying or" + + "uploading: %s", layer) + continue + + squash_path = os.path.join(skopeo_dir, layer + ".sqsh") + squash_name = os.path.basename(squash_path) + upload_to_hdfs(squash_path, squash_name, hdfs_layers_dir, replication, "444", force) + + + config_local_path = os.path.join(skopeo_dir, config_hash) + upload_to_hdfs(config_local_path, + os.path.basename(config_local_path), + hdfs_config_dir, replication, "444", force) + + manifest_local_path = os.path.join(skopeo_dir, "manifest.json") + upload_to_hdfs(manifest_local_path, manifest_hash, + hdfs_manifest_dir, replication, "444", force) + + write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags) + upload_image_tag_to_hash_to_hdfs(local_image_tag_to_hash, image_tag_to_hash, + hdfs_root, replication, + image_tag_to_hash_hash) + finally: + if local_image_tag_to_hash: + if os.path.isfile(local_image_tag_to_hash): + os.remove(local_image_tag_to_hash) + + +def remove_image(args): + hdfs_root = args.hdfs_root + image_tag_to_hash = args.image_tag_to_hash + replication = args.replication + images_or_tags = args.images_or_tags + working_dir = None + + try: + working_dir = get_working_dir(args.working_dir) + local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash)) + hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root, + image_tag_to_hash, + local_image_tag_to_hash) + + logging.debug("hash_to_tags: %s", str(hash_to_tags)) + logging.debug("tag_to_hash: %s", str(tag_to_hash)) + + hdfs_layers_dir = hdfs_root + "/layers" + hdfs_config_dir = hdfs_root + "/config" + hdfs_manifest_dir = hdfs_root + "/manifests" + + delete_list = [] + + known_images, err, returncode = hdfs_ls(hdfs_manifest_dir, "-C", False, False, False) + known_images = known_images.split() + + logging.debug("known_images:\n%s", known_images) + + layers_to_keep = [] + + images_and_tags_to_remove = [] + images_to_remove = [] + for image_or_tag_arg in images_or_tags: + images_and_tags_to_remove.extend(image_or_tag_arg.split(",")) + + logging.debug("images_and_tags_to_remove:\n%s", images_and_tags_to_remove) + + if isinstance(images_and_tags_to_remove, Iterable): + for image in images_and_tags_to_remove: + if is_sha256_hash(image): + image_hash = image + else: + image_hash = tag_to_hash.get(image, None) + if image_hash: + images_to_remove.append(hdfs_manifest_dir + "/" + image_hash) + else: + image = images_and_tags_to_remove[0] + if is_sha256_hash(image): + image_hash = image + else: + image_hash = tag_to_hash.get(image, None) + if image_hash: + images_to_remove.append(hdfs_manifest_dir + "/" + image_hash) + + logging.debug("images_to_remove:\n%s", images_to_remove) + if not images_to_remove: + logging.warn("No images to remove") + return + + for image in known_images: + if image not in images_to_remove: + manifest, manifest_hash = get_hdfs_manifest_from_path(image) + layers = get_layer_hashes_from_manifest(manifest) + layers_to_keep.extend(layers) + + logging.debug("layers_to_keep:\n%s", layers_to_keep) + + for image_or_tag_arg in images_or_tags: + images = image_or_tag_arg.split(",") + for image in images: + logging.info("removing image: %s", image) + if is_sha256_hash(image): + logging.debug("image is sha256") + image_hash = image + else: + image_hash = tag_to_hash.get(image, None) + if image_hash: + logging.debug("image tag exists for %s", image) + else: + logging.info("Not removing %s. Image tag doesn't exist", image) + continue + manifest_path = hdfs_manifest_dir + "/" + image_hash + if does_hdfs_entry_exist(manifest_path): + logging.debug("image manifest for %s exists: %s", image, manifest_path) + else: + logging.info("Not removing %s. Image manifest doesn't exist: %s", image, manifest_path) + continue + + delete_list.append(manifest_path) + + manifest, manifest_hash = get_hdfs_manifest_from_path(manifest_path) + + config_hash = get_config_hash_from_manifest(manifest) + logging.debug("config_hash: %s", config_hash) + + delete_list.append(hdfs_config_dir + "/" + config_hash) + + layers = get_layer_hashes_from_manifest(manifest) + layers_paths = [] + for layer in layers: + if layer not in layers_to_keep: + layers_paths.append(hdfs_layers_dir + "/" + layer + ".sqsh") + delete_list.extend(layers_paths) + + logging.debug("delete_list: %s", delete_list) + + remove_image_hash_from_dicts(hash_to_tags, tag_to_hash, image_hash) + + write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags) + upload_image_tag_to_hash_to_hdfs(local_image_tag_to_hash, image_tag_to_hash, + hdfs_root, replication, + image_tag_to_hash_hash) + + hdfs_rm(delete_list) + + finally: + if working_dir: + if os.path.isdir(working_dir): + shutil.rmtree(working_dir) + +def add_remove_tag(args): + pull_format = args.pull_format + hdfs_root = args.hdfs_root + image_tag_to_hash = args.image_tag_to_hash + replication = args.replication + sub_command = args.sub_command + images_and_tags = args.images_and_tags + + hdfs_manifest_dir = hdfs_root + "/manifests" + working_dir = None + + try: + working_dir = get_working_dir(args.working_dir) + local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash)) + hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root, + image_tag_to_hash, + local_image_tag_to_hash) + + for image_and_tag_arg in images_and_tags: + if sub_command == "add-tag": + image, tags = split_image_and_tag(image_and_tag_arg) + if is_sha256_hash(image): + manifest_hash = image + else: + manifest_hash = tag_to_hash.get(image, None) + + if manifest_hash: + manifest_path = hdfs_manifest_dir + "/" + manifest_hash + out, err, returncode = hdfs_cat(manifest_path) + manifest = json.loads(out) + logging.debug("image tag exists for %s", image) + else: + manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image) + + update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags, + manifest_hash, image) + + elif sub_command == "remove-tag": + tags = image_and_tag_arg.split(",") + image = None + manifest = None + manifest_hash = 0 + remove_from_dicts(hash_to_tags, tag_to_hash, tags) + else: + raise Exception("Invalid sub_command: %s" % (sub_command)) + + write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags) + upload_image_tag_to_hash_to_hdfs(local_image_tag_to_hash, image_tag_to_hash, + hdfs_root, replication, + image_tag_to_hash_hash) + finally: + if working_dir: + if os.path.isdir(working_dir): + shutil.rmtree(working_dir) + +def copy_update(args): + image_tag_to_hash = args.image_tag_to_hash + replication = args.replication + force = args.force + src_root = args.src_root + dest_root = args.dest_root + images_and_tags = args.images_and_tags + + src_layers_dir = src_root + "/layers" + src_config_dir = src_root + "/config" + src_manifest_dir = src_root + "/manifests" + dest_layers_dir = dest_root + "/layers" + dest_config_dir = dest_root + "/config" + dest_manifest_dir = dest_root + "/manifests" + + setup_hdfs_dir(dest_root) + setup_squashfs_hdfs_dirs(dest_layers_dir, dest_config_dir, dest_manifest_dir) + working_dir = None + + try: + working_dir = get_working_dir(args.working_dir) + local_src_image_tag_to_hash = os.path.join(working_dir, "src-" + + os.path.basename(image_tag_to_hash)) + local_dest_image_tag_to_hash = os.path.join(working_dir, "dest-" + + os.path.basename(image_tag_to_hash)) + + src_hash_to_tags, src_tag_to_hash, src_image_tag_to_hash_hash = populate_tag_dicts(src_root, image_tag_to_hash, local_src_image_tag_to_hash) + dest_hash_to_tags, dest_tag_to_hash, dest_image_tag_to_hash_hash = populate_tag_dicts(dest_root, image_tag_to_hash, local_dest_image_tag_to_hash) + + for image_and_tag_arg in images_and_tags: + image, tags = split_image_and_tag(image_and_tag_arg) + if not image: + raise Exception("Positional parameter requires an image: " + image_and_tag_arg) + if not tags: + logging.debug("Tag not given. Using image tag instead: %s", image) + tags = [image] + + src_manifest_hash = src_tag_to_hash.get(image, None) + if not src_manifest_hash: + logging.info("Manifest not found for image %s. Skipping", image) + continue + + src_manifest_path = src_manifest_dir + "/" + src_manifest_hash + dest_manifest_path = dest_manifest_dir + "/" + src_manifest_hash + src_manifest, src_manifest_hash = get_hdfs_manifest_from_path(src_manifest_path) + + src_config_hash = get_config_hash_from_manifest(src_manifest) + src_config_path = src_config_dir + "/" + src_config_hash + dest_config_path = dest_config_dir + "/" + src_config_hash + + src_layers = get_layer_hashes_from_manifest(src_manifest) + src_layers_paths = [src_layers_dir + "/" + layer + ".sqsh" for layer in src_layers] + dest_layers_paths = [dest_layers_dir + "/" + layer + ".sqsh" for layer in src_layers] + + logging.debug("Copying Manifest: %s", str(src_manifest_path)) + logging.debug("Copying Layers: %s", str(src_layers_paths)) + logging.debug("Copying Config: %s", str(src_config_hash)) + + hdfs_cp(src_layers_paths, dest_layers_dir, force) + hdfs_cp(src_config_path, dest_config_dir, force) + hdfs_cp(src_manifest_path, dest_manifest_dir, force) + + hdfs_setrep(replication, dest_layers_paths) + hdfs_setrep(replication, dest_config_path) + hdfs_setrep(replication, dest_manifest_path) + + hdfs_chmod("444", dest_layers_paths) + hdfs_chmod("444", dest_config_path) + hdfs_chmod("444", dest_manifest_path) + + for tag in tags: + new_tags_and_comments = src_hash_to_tags.get(src_manifest_hash, None) + if new_tags_and_comments: + comment = ', '.join(map(str, new_tags_and_comments[1])) + if comment is None: + comment = image + + update_dicts(dest_hash_to_tags, dest_tag_to_hash, tag, src_manifest_hash, comment) + + write_local_image_tag_to_hash(local_dest_image_tag_to_hash, dest_hash_to_tags) + upload_image_tag_to_hash_to_hdfs(local_dest_image_tag_to_hash, image_tag_to_hash, + dest_root, replication, + dest_image_tag_to_hash_hash) + + finally: + if working_dir: + if os.path.isdir(working_dir): + shutil.rmtree(working_dir) + +def query_tag(args): + hdfs_root = args.hdfs_root + image_tag_to_hash = args.image_tag_to_hash + tags = args.tags + working_dir = None + + try: + working_dir = get_working_dir(args.working_dir) + local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash)) + hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root, + image_tag_to_hash, + local_image_tag_to_hash) + + logging.debug("hash_to_tags: %s", str(hash_to_tags)) + logging.debug("tag_to_hash: %s", str(tag_to_hash)) + + hdfs_layers_dir = hdfs_root + "/layers" + hdfs_config_dir = hdfs_root + "/config" + hdfs_manifest_dir = hdfs_root + "/manifests" + + for tag in tags: + image_hash = tag_to_hash.get(tag, None) + if not image_hash: + logging.info("image hash mapping doesn't exist for tag %s", tag) + continue + + manifest_path = hdfs_manifest_dir + "/" + image_hash + if does_hdfs_entry_exist(manifest_path): + logging.debug("image manifest for %s exists: %s", tag, manifest_path) + else: + logging.info("Image manifest for %s doesn't exist: %s", tag, manifest_path) + continue + + manifest, manifest_hash = get_hdfs_manifest_from_path(manifest_path) + layers = get_layer_hashes_from_manifest(manifest) + config_hash = get_config_hash_from_manifest(manifest) + config_path = hdfs_config_dir + "/" + config_hash + + layers_paths = [hdfs_layers_dir + "/" + layer + ".sqsh" for layer in layers] + + logging.info("Image info for '%s'", tag) + logging.info(manifest_path) + logging.info(config_path) + for layer in layers_paths: + logging.info(layer) + + finally: + if working_dir: + if os.path.isdir(working_dir): + shutil.rmtree(working_dir) + +def list_tags(args): + hdfs_root = args.hdfs_root + image_tag_to_hash = args.image_tag_to_hash + + hdfs_image_tag_to_hash = hdfs_root + "/" + image_tag_to_hash + if does_hdfs_entry_exist(hdfs_image_tag_to_hash): + hdfs_cat(hdfs_image_tag_to_hash, True, True, False) + else: + logging.error("image-tag-to-hash file doesn't exist: %s", hdfs_image_tag_to_hash) + +def create_parsers(): + parser = argparse.ArgumentParser() + add_parser_default_arguments(parser) + + subparsers = parser.add_subparsers(help='sub help', dest='sub_command') + + parse_pull_build_push_update = subparsers.add_parser('pull-build-push-update', + help='Pull an image, build its squashfs' + + 'layers, push it to hdfs, and' + + 'atomically update the' + + 'image-tag-to-hash file') + parse_pull_build_push_update.set_defaults(func=pull_build_push_update) + add_parser_default_arguments(parse_pull_build_push_update) + parse_pull_build_push_update.add_argument("images_and_tags", nargs="+", + help="Image and tag argument (can specify multiple)") + + parse_pull_build = subparsers.add_parser('pull-build', + help='Pull an image and build its squashfs layers') + parse_pull_build .set_defaults(func=pull_build) + add_parser_default_arguments(parse_pull_build) + parse_pull_build.add_argument("images_and_tags", nargs="+", + help="Image and tag argument (can specify multiple)") + + parse_push_update = subparsers.add_parser('push-update', + help='Push the squashfs layers to hdfs and update' + + 'the image-tag-to-hash file') + parse_push_update.set_defaults(func=push_update) + add_parser_default_arguments(parse_push_update) + parse_push_update.add_argument("images_and_tags", nargs="+", + help="Image and tag argument (can specify multiple)") + + parse_remove_image = subparsers.add_parser('remove-image', + help='Remove an image (manifest, config, layers)' + + 'from hdfs based on its tag or manifest hash') + parse_remove_image.set_defaults(func=remove_image) + add_parser_default_arguments(parse_remove_image) + parse_remove_image.add_argument("images_or_tags", nargs="+", + help="Image or tag argument (can specify multiple)") + + parse_remove_tag = subparsers.add_parser('remove-tag', + help='Remove an image to tag mapping in the' + + 'image-tag-to-hash file') + parse_remove_tag.set_defaults(func=add_remove_tag) + add_parser_default_arguments(parse_remove_tag) + parse_remove_tag.add_argument("images_and_tags", nargs="+", + help="Image and tag argument (can specify multiple)") + + parse_add_tag = subparsers.add_parser('add-tag', + help='Add an image to tag mapping in the' + + 'image-tag-to-hash file') + parse_add_tag.set_defaults(func=add_remove_tag) + add_parser_default_arguments(parse_add_tag) + parse_add_tag.add_argument("images_and_tags", nargs="+", + help="Image and tag argument (can specify multiple)") + + parse_copy_update = subparsers.add_parser('copy-update', + help='Copy an image from hdfs in one cluster to' + + 'another and then update the' + + 'image-tag-to-hash file') + parse_copy_update.set_defaults(func=copy_update) + add_parser_default_arguments(parse_copy_update) + parse_copy_update.add_argument("src_root", + help="HDFS path for source root directory") + parse_copy_update.add_argument("dest_root", + help="HDFS path for destination root directory") + parse_copy_update.add_argument("images_and_tags", nargs="+", + help="Image and tag argument (can specify multiple)") + + parse_query_tag = subparsers.add_parser('query-tag', + help='Get the manifest, config, and layers' + + 'associated with a tag') + parse_query_tag.set_defaults(func=query_tag) + add_parser_default_arguments(parse_query_tag) + parse_query_tag.add_argument("tags", nargs="+", + help="Image or tag argument (can specify multiple)") + + parse_list_tags = subparsers.add_parser('list-tags', + help='List all tags in image-tag-to-hash file') + parse_list_tags.set_defaults(func=list_tags) + add_parser_default_arguments(parse_list_tags) + + return parser + +def add_parser_default_arguments(parser): + parser.add_argument("--working-dir", type=str, dest='working_dir', default="dts-work-dir", + help="Name of working directory") + parser.add_argument("--skopeo-format", type=str, dest='skopeo_format', + default='dir', help="Output format for skopeo copy") + parser.add_argument("--pull-format", type=str, dest='pull_format', + default='docker', help="Pull format for skopeo") + parser.add_argument("-l", "--log", type=str, dest='LOG_LEVEL', + default="INFO", help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)") + parser.add_argument("--hdfs-root", type=str, dest='hdfs_root', + default='/runc-root', help="The root directory in HDFS for all of the" + + "squashfs images") + parser.add_argument("--image-tag-to-hash", type=str, + dest='image_tag_to_hash', default='image-tag-to-hash', + help="image-tag-to-hash filename in hdfs") + parser.add_argument("-r", "--replication", type=int, dest='replication', + default=1, help="Replication factor for all files uploaded to HDFS") + parser.add_argument("--hadoop-prefix", type=str, dest='hadoop_prefix', + default=os.environ.get('HADOOP_PREFIX'), + help="hadoop prefix value for environment") + parser.add_argument("-f", "--force", dest='force', + action="store_true", default=False, help="Force overwrites in HDFS") + parser.add_argument("--check-magic-file", dest='check_magic_file', + action="store_true", default=False, help="Check for a specific magic file" + + "in the image before uploading") + parser.add_argument("--magic-file", type=str, dest='magic_file', + default='etc/dockerfile-version', help="The magic file to check for" + + "in the image") + return parser + +def check_dependencies(): + try: + command = [HADOOP_BIN_DIR + "/hadoop", "version"] + out, err, returncode = shell_command(command, False, False, True) + except: + logging.error("Could not find hadoop. Make sure HADOOP_PREFIX " + + "is set correctly either in your environment or on the command line via " + + "via --hadoop-prefix") + return 1 + + try: + command = ["skopeo", "-v"] + out, err, returncode = shell_command(command, False, False, True) + except: + logging.error("Could not find skopeo. Make sure it is installed and present " + + "on the PATH") + return 1 + + try: + command = ["mksquashfs", "-version"] + out, err, returncode = shell_command(command, False, False, True) + except: + logging.error("Could not find mksquashfs. Make sure squashfs-tools is installed " + + "and mksquashfs is present on the the PATH") + return 1 + +def main(): + global LOG_LEVEL + global HADOOP_BIN_DIR + parser = create_parsers() + args, extra = parser.parse_known_args() + + if extra: + raise Exception("Extra unknown arguments given: %s" % (extra)) + + LOG_LEVEL = args.LOG_LEVEL.upper() + image_tag_to_hash = args.image_tag_to_hash + working_dir = args.working_dir + + numeric_level = getattr(logging, LOG_LEVEL, None) + if not isinstance(numeric_level, int): + logging.error("Invalid log level: %s", LOG_LEVEL) + return + logging.basicConfig(format="%(levelname)s: %(message)s", level=numeric_level) + + if args.hadoop_prefix is None: + logging.error("Hadoop prefix is not set. You may set it either " + + "in your environment or via --hadoop-prefix") + return + + HADOOP_BIN_DIR = args.hadoop_prefix + "/bin" + + if check_dependencies(): + return + + if "/" in image_tag_to_hash: + logging.error("image-tag-to-hash cannot contain a /") + return + + logging.debug("args: %s", str(args)) + logging.debug("extra: %s", str(extra)) + logging.debug("image-tag-to-hash: %s", image_tag_to_hash) + logging.debug("LOG_LEVEL: %s", LOG_LEVEL) + logging.debug("HADOOP_BIN_DIR: %s", str(HADOOP_BIN_DIR)) + + args.func(args) + +if __name__ == "__main__": + main()