From 0d5940b07c274f1652af15e710442e87eb2578a0 Mon Sep 17 00:00:00 2001 From: Jaekwon Bang Date: Fri, 10 Feb 2023 10:55:01 +0900 Subject: [PATCH] Be available to download yocto url --- src/fosslight_util/download.py | 133 ++++++++++++++++++----------- src/fosslight_util/help.py | 20 +++++ src/fosslight_util/parsing_yaml.py | 5 +- 3 files changed, 108 insertions(+), 50 deletions(-) diff --git a/src/fosslight_util/download.py b/src/fosslight_util/download.py index 291273d..d8480f1 100755 --- a/src/fosslight_util/download.py +++ b/src/fosslight_util/download.py @@ -8,13 +8,14 @@ import tarfile import zipfile import logging -import getopt +import argparse import shutil import pygit2 as git import bz2 from datetime import datetime from pathlib import Path -from ._get_downloadable_url import get_downloadable_url +from fosslight_util._get_downloadable_url import get_downloadable_url +from fosslight_util.help import print_help_msg_download import fosslight_util.constant as constant from fosslight_util.set_log import init_log import signal @@ -48,37 +49,59 @@ def alarm_handler(signum, frame): raise TimeOutException() -def print_help_msg(): - print("* Required : -s link_to_download") - print("* Optional : -t target_directory") - print("* Optional : -d log_file_directory") - sys.exit(0) +def change_src_link_to_https(src_link): + src_link = src_link.replace("git://", "https://") + if src_link.endswith(".git"): + src_link = src_link.replace('.git', '') + return src_link + + +def parse_src_link(src_link): + src_info = {} + src_link_changed = "" + if src_link.startswith("git://") or src_link.startswith("https://") or src_link.startswith("http://"): + src_link_split = src_link.split(';') + if src_link.startswith("git://github.com/"): + src_link_changed = change_src_link_to_https(src_link_split[0]) + else: + src_link_changed = src_link_split[0] + + branch_info = [s for s in src_link_split if s.startswith('branch')] + tag_info = [s for s in src_link_split if s.startswith('tag')] + + src_info["url"] = src_link_changed + src_info["branch"] = branch_info + src_info["tag"] = tag_info + return src_info def main(): + parser = argparse.ArgumentParser(description='FOSSLight Downloader', prog='fosslight_download', add_help=False) + parser.add_argument('-h', '--help', help='Print help message', action='store_true', dest='help') + parser.add_argument('-s', '--source', help='Source link to download', type=str, dest='source') + parser.add_argument('-t', '--target_dir', help='Target directory', type=str, dest='target_dir', default="") + parser.add_argument('-d', '--log_dir', help='Directory to save log file', type=str, dest='log_dir', default="") src_link = "" target_dir = os.getcwd() log_dir = os.getcwd() try: - argv = sys.argv[1:] - opts, args = getopt.getopt(argv, 'hs:t:d:') - except getopt.GetoptError: - print_help_msg() - - for opt, arg in opts: - if opt == "-h": - print_help_msg() - elif opt == "-s": - src_link = arg - elif opt == "-t": - target_dir = arg - elif opt == "-d": - log_dir = arg + args = parser.parse_args() + except SystemExit: + sys.exit(0) + + if args.help: + print_help_msg_download() + if args.source: + src_link = args.source + if args.target_dir: + target_dir = args.target_dir + if args.log_dir: + log_dir = args.log_dir if src_link == "": - print_help_msg() + print_help_msg_download() else: cli_download_and_extract(src_link, target_dir, log_dir) @@ -98,9 +121,14 @@ def cli_download_and_extract(link, target_dir, log_dir, checkout_to="", compress msg = "Need a link to download." elif os.path.isfile(target_dir): success = False - msg = "The target directory exists as a file.:"+target_dir + msg = f"The target directory exists as a file.: {target_dir}" else: - if not download_git_clone(link, target_dir, checkout_to): + src_info = parse_src_link(link) + link = src_info.get("url", "") + tag = ''.join(src_info.get("tag", "")).split('=')[-1] + branch = ''.join(src_info.get("branch", "")).split('=')[-1] + + if not download_git_clone(link, target_dir, checkout_to, tag, branch): if os.path.isfile(target_dir): shutil.rmtree(target_dir) @@ -111,7 +139,7 @@ def cli_download_and_extract(link, target_dir, log_dir, checkout_to="", compress success = False msg = str(error) - logger.info("* FOSSLight Downloader - Result :"+str(success)+"\n"+msg) + logger.info(f"\n* FOSSLight Downloader - Result: {success}\n {msg}") return success, msg @@ -131,11 +159,24 @@ def get_ref_to_checkout(checkout_to, ref_list): ref_to_checkout = next( x for x in ref_list if x.endswith(checkout_to)) except Exception as error: - logger.warning("git find ref - failed:"+str(error)) + logger.warning(f"git find ref - failed: {error}") + return ref_to_checkout + + +def decide_checkout(checkout_to="", tag="", branch=""): + if checkout_to != "": + ref_to_checkout = checkout_to + else: + if branch != "": + ref_to_checkout = branch + else: + ref_to_checkout = tag return ref_to_checkout -def download_git_clone(git_url, target_dir, checkout_to=""): +def download_git_clone(git_url, target_dir, checkout_to="", tag="", branch=""): + ref_to_checkout = decide_checkout(checkout_to, tag, branch) + if platform.system() != "Windows": signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(SIGNAL_TIMEOUT) @@ -152,18 +193,16 @@ def download_git_clone(git_url, target_dir, checkout_to=""): else: del alarm except Exception as error: - logger.warning("git clone - failed:"+str(error)) + logger.warning(f"git clone - failed: {error}") return False try: - ref_to_checkout = checkout_to - if checkout_to != "": + if ref_to_checkout != "": ref_list = [x for x in repo.references] - ref_to_checkout = get_ref_to_checkout(checkout_to, ref_list) - logger.info("git checkout :"+ref_to_checkout) + ref_to_checkout = get_ref_to_checkout(ref_to_checkout, ref_list) + logger.info(f"git checkout: {ref_to_checkout}") repo.checkout(ref_to_checkout) except Exception as error: - logger.warning("git checkout to "+ref_to_checkout + - " - failed:"+str(error)) + logger.warning(f"git checkout to {ref_to_checkout} - failed: {error}") return True @@ -194,33 +233,31 @@ def download_wget(link, target_dir, compressed_only): if not success: raise Exception('Not supported compression type (link:{0})'.format(link)) - logger.info("wget:"+link) - downloaded_file = wget.download(link) + logger.info(f"wget: {link}") + downloaded_file = wget.download(link, target_dir) if platform.system() != "Windows": signal.alarm(0) else: del alarm - shutil.move(downloaded_file, target_dir) - downloaded_file = os.path.join(target_dir, downloaded_file) if downloaded_file != "": success = True - logger.debug("wget - downloaded:"+downloaded_file) + logger.debug(f"wget - downloaded: {downloaded_file}") except Exception as error: success = False - logger.warning("wget - failed:"+str(error)) + logger.warning(f"wget - failed: {error}") return success, downloaded_file def extract_compressed_dir(src_dir, target_dir, remove_after_extract=True): - logger.debug("Extract Dir:"+src_dir) + logger.debug(f"Extract Dir: {src_dir}") try: files_path = [os.path.join(src_dir, x) for x in os.listdir(src_dir)] for fname in files_path: extract_compressed_file(fname, target_dir, remove_after_extract) except Exception as error: - logger.debug("Extract files in dir - failed:"+str(error)) + logger.debug(f"Extract files in dir - failed: {error}") return False return True @@ -248,15 +285,15 @@ def extract_compressed_file(fname, extract_path, remove_after_extract=True): decompress_bz2(fname, extract_path) else: is_compressed_file = False - logger.warning("Unsupported file extension:"+fname) + logger.warning(f"Unsupported file extension: {fname}") if remove_after_extract and is_compressed_file: - logger.debug("Remove - extracted file :"+fname) + logger.debug(f"Remove - extracted file: {fname}") os.remove(fname) else: - logger.warning("Not a file:"+fname) + logger.warning(f"Not a file: {fname}") except Exception as error: - logger.error("Extract - failed:"+str(error)) + logger.error(f"Extract - failed: {error}") return False return True @@ -268,7 +305,7 @@ def decompress_bz2(source_file, dest_path): open(os.path.splitext(source_file)[0], 'wb').write(data) # write a uncompressed file except Exception as error: - logger.error("Decompress bz2 - failed:"+str(error)) + logger.error(f"Decompress bz2 - failed: {error}") return False return True @@ -280,7 +317,7 @@ def unzip(source_file, dest_path): fzip.extract(filename, dest_path) fzip.close() except Exception as error: - logger.error("Unzip - failed:"+str(error)) + logger.error(f"Unzip - failed: {error}") return False return True diff --git a/src/fosslight_util/help.py b/src/fosslight_util/help.py index ebd59b9..18ec870 100644 --- a/src/fosslight_util/help.py +++ b/src/fosslight_util/help.py @@ -19,6 +19,21 @@ """ +_HELP_MESSAGE_DOWNLOAD = """ + FOSSLight Downloader is a tool to download the package via input URL + + Usage: fosslight_download [option1] [options2] + ex) fosslight_download -s http://github.com/fosslight/fosslight -t output_dir -d log_dir + + Required: + -s\t\t URL of the package to be downloaded + + Optional: + -h\t\t Print help message + -t\t\t Output path name + -d\t\t Directory name to save the log file""" + + class PrintHelpMsg(): message_suffix = "" @@ -41,3 +56,8 @@ def print_package_version(pkg_name, msg="", exitopt=True): if exitopt: sys.exit(0) + + +def print_help_msg_download(exitOpt=True): + helpMsg = PrintHelpMsg(_HELP_MESSAGE_DOWNLOAD) + helpMsg.print_help_msg(exitOpt) diff --git a/src/fosslight_util/parsing_yaml.py b/src/fosslight_util/parsing_yaml.py index 69fc49a..1d3ac9b 100644 --- a/src/fosslight_util/parsing_yaml.py +++ b/src/fosslight_util/parsing_yaml.py @@ -92,7 +92,7 @@ def find_sbom_yaml_files(path_to_find): return oss_pkg_files -def set_value_switch(oss, key, value, yaml_file): +def set_value_switch(oss, key, value, yaml_file=""): if key in ['oss name', 'name']: oss.name = value elif key in ['oss version', 'version']: @@ -116,4 +116,5 @@ def set_value_switch(oss, key, value, yaml_file): elif key == 'yocto_recipe': oss.yocto_recipe = value else: - _logger.debug(f"file:{yaml_file} - key:{key} cannot be parsed") + if yaml_file != "": + _logger.debug(f"file:{yaml_file} - key:{key} cannot be parsed")