diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index a663409b21..23087392c2 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -159,6 +159,17 @@ jobs: exit "0" fi + snapshots: + name: "🔍 Check for valid snapshot urls" + runs-on: ubuntu-20.04 + steps: + - name: Check out code into the Go module directory + uses: actions/checkout@v3 + with: + ref: ${{ github.event.pull_request.head.sha }} + - name: Check for valid snapshot urls + run: ./tools/check-snapshots --errors-only . + shellcheck: name: "🐚 Shellcheck" runs-on: ubuntu-20.04 diff --git a/tools/check-snapshots b/tools/check-snapshots new file mode 100755 index 0000000000..a990407a07 --- /dev/null +++ b/tools/check-snapshots @@ -0,0 +1,180 @@ +#!/usr/bin/python3 +"""check-snapshots greps the directory tree for rpmrepo urls and checks them +against the current snapshot list""" + +import argparse +import json +import os +import sys +import subprocess +import time +from urllib.parse import urlparse + +import requests + +SNAPSHOTS_URL="https://rpmrepo.osbuild.org/v2/enumerate" +SNAPSHOTS_TIMEOUT = 2 * 60 +SNAPSHOT_GREP = ["grep", "--color=never", "-or", r"http.*rpmrepo.osbuild.org.*-20[0-9]\+"] + +def fetch_snapshots_api(url, timeout=SNAPSHOTS_TIMEOUT): + """Get the list of snapshots from the rpmrepo API""" + print(f"Fetching list of snapshots from {url}") + start = time.time() + try: + r = requests.get(url, timeout=timeout) + except: + return None + elapsed = time.time() - start + if r.status_code != 200: + print(f"HTTP Response {r.status_code} from {url} after {elapsed:0.0f}s: {r.text}") + return None + print(f"Received snapshot list in {elapsed:0.0f}s") + return r.json() + + +def find_snapshot_urls(directory): + """grep the directory for rpmrepo snapshot urls + + Returns a map of urls to the files they are used in. + """ + urls = {} + try: + grep_out = subprocess.run(SNAPSHOT_GREP + [directory], + check=True, + capture_output=True, + env={"LANG": "C"}) + except subprocess.CalledProcessError as e: + print("ERROR: " + e.stderr.decode("utf-8")) + sys.exit(1) + + for line in grep_out.stdout.decode("utf-8").splitlines(): + try: + file, url = line.split(":", 1) + except ValueError: + print(f"Problem parsing {line}") + continue + url = url.strip() + if url not in urls: + urls[url] = [file] + else: + urls[url].append(file) + + return urls + + +def check_baseurl(repo, snapshots): + """Check the baseurl to see if it is a valid snapshot, and if there is a newer one + available. + """ + invalid = None + newer = None + url = urlparse(repo) + snapshot = os.path.basename(url.path) + + # Is this snapshot valid? + if snapshot not in snapshots: + invalid = f"{snapshot} is not a valid snapshot name" + + # is this snapshot old? + base = snapshot.rsplit("-", 1)[0] + newest = snapshot + for s in snapshots: + if s.rsplit("-", 1)[0] != base: + continue + if s > newest: + newest = s + if newest != snapshot: + newer = f"{snapshot} has a newer version - {newest}" + + return invalid, newer + + +def check_snapshot_urls(urls, snapshots, skip=["test/data/manifests"], errors_only=False): + """check the urls against the current list of snapshots + + Returns: + 0 if all were valid and no newer snapshots are available + 2 if there were invalid snapshots + 3 if there were newer snapshots + 6 if there were invalid and newer snapshots + """ + # Gather up the messages for each file + messages = {} + ret = 0 + for url in urls: + invalid, newer = check_baseurl(url, snapshots) + if invalid: + # Add this to each file's invalid message list + for f in urls[url]: + if any(bool(s in f) for s in skip): + continue + ret |= 2 + if f in messages: + if invalid not in messages[f]["invalid"]: + messages[f]["invalid"].append(invalid) + else: + messages[f] = {"invalid": [invalid], "newer": []} + + if errors_only: + continue + + if newer: + # Add this to each file's newer message list + for f in urls[url]: + if any(bool(s in f) for s in skip): + continue + ret |= 4 + if f in messages: + if newer not in messages[f]["newer"]: + messages[f]["newer"].append(newer) + else: + messages[f] = {"newer": [newer], "invalid": []} + + # Print the messages for each file + for f in messages: + print(f"{f}:") + for msg in messages[f]["invalid"]: + print(f" ERROR: {msg}") + for msg in messages[f]["newer"]: + print(f" NEWER: {msg}") + + return ret + + +# parse cmdline args +def parse_args(): + parser =argparse.ArgumentParser(description="Check snapshot urls") + parser.add_argument("--verbose") + parser.add_argument("--timeout", type=int, default=SNAPSHOTS_TIMEOUT, + help="How long to wait for rpmrepo snapshot list") + parser.add_argument("--cache", help="Use a cached file for the list of rpmrepo snapshots") + parser.add_argument("--url", default=SNAPSHOTS_URL, + help="URL to use for the list of rpmrepo snapshots") + parser.add_argument("--errors-only", action="store_true", + help="Only return errors") + parser.add_argument("directory") + return parser.parse_args() + +def main(): + args = parse_args() + urls = find_snapshot_urls(args.directory) + + snapshots = None + if args.cache: + try: + with open(args.cache, encoding="utf8") as f: + snapshots = json.load(f) + except: + print(f"No snapshots cache found at {args.cache}") + sys.exit(1) + else: + snapshots = fetch_snapshots_api(args.url, args.timeout) + if not snapshots: + print(f"Cannot download snapshots from {args.url}") + sys.exit(1) + + return check_snapshot_urls(urls, snapshots, errors_only=args.errors_only) + + +if __name__=='__main__': + sys.exit(main())