-
Notifications
You must be signed in to change notification settings - Fork 107
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
tests: Add a check for valid snapshot urls
This pulls the list of snapshots from the rpmrepo API, greps the codebase for all uses of rpmrepo.osbuild.org that look like a snapshot name, and then checks to make sure they are still valid.
- Loading branch information
Showing
2 changed files
with
191 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,180 @@ | ||
#!/usr/bin/python3 | ||
"""check-snapshots greps the directory tree for rpmrepo urls and checks them | ||
against the current snapshot list""" | ||
|
||
import argparse | ||
import json | ||
import os | ||
import sys | ||
import subprocess | ||
import time | ||
from urllib.parse import urlparse | ||
|
||
import requests | ||
|
||
SNAPSHOTS_URL="https://rpmrepo.osbuild.org/v2/enumerate" | ||
SNAPSHOTS_TIMEOUT = 2 * 60 | ||
SNAPSHOT_GREP = ["grep", "--color=never", "-or", r"http.*rpmrepo.osbuild.org.*-20[0-9]\+"] | ||
|
||
def fetch_snapshots_api(url, timeout=SNAPSHOTS_TIMEOUT): | ||
"""Get the list of snapshots from the rpmrepo API""" | ||
print(f"Fetching list of snapshots from {url}") | ||
start = time.time() | ||
try: | ||
r = requests.get(url, timeout=timeout) | ||
except: | ||
return None | ||
elapsed = time.time() - start | ||
if r.status_code != 200: | ||
print(f"HTTP Response {r.status_code} from {url} after {elapsed:0.0f}s: {r.text}") | ||
return None | ||
print(f"Received snapshot list in {elapsed:0.0f}s") | ||
return r.json() | ||
|
||
|
||
def find_snapshot_urls(directory): | ||
"""grep the directory for rpmrepo snapshot urls | ||
Returns a map of urls to the files they are used in. | ||
""" | ||
urls = {} | ||
try: | ||
grep_out = subprocess.run(SNAPSHOT_GREP + [directory], | ||
check=True, | ||
capture_output=True, | ||
env={"LANG": "C"}) | ||
except subprocess.CalledProcessError as e: | ||
print("ERROR: " + e.stderr.decode("utf-8")) | ||
sys.exit(1) | ||
|
||
for line in grep_out.stdout.decode("utf-8").splitlines(): | ||
try: | ||
file, url = line.split(":", 1) | ||
except ValueError: | ||
print(f"Problem parsing {line}") | ||
continue | ||
url = url.strip() | ||
if url not in urls: | ||
urls[url] = [file] | ||
else: | ||
urls[url].append(file) | ||
|
||
return urls | ||
|
||
|
||
def check_baseurl(repo, snapshots): | ||
"""Check the baseurl to see if it is a valid snapshot, and if there is a newer one | ||
available. | ||
""" | ||
invalid = None | ||
newer = None | ||
url = urlparse(repo) | ||
snapshot = os.path.basename(url.path) | ||
|
||
# Is this snapshot valid? | ||
if snapshot not in snapshots: | ||
invalid = f"{snapshot} is not a valid snapshot name" | ||
|
||
# is this snapshot old? | ||
base = snapshot.rsplit("-", 1)[0] | ||
newest = snapshot | ||
for s in snapshots: | ||
if s.rsplit("-", 1)[0] != base: | ||
continue | ||
if s > newest: | ||
newest = s | ||
if newest != snapshot: | ||
newer = f"{snapshot} has a newer version - {newest}" | ||
|
||
return invalid, newer | ||
|
||
|
||
def check_snapshot_urls(urls, snapshots, skip=["test/data/manifests"], errors_only=False): | ||
"""check the urls against the current list of snapshots | ||
Returns: | ||
0 if all were valid and no newer snapshots are available | ||
2 if there were invalid snapshots | ||
3 if there were newer snapshots | ||
6 if there were invalid and newer snapshots | ||
""" | ||
# Gather up the messages for each file | ||
messages = {} | ||
ret = 0 | ||
for url in urls: | ||
invalid, newer = check_baseurl(url, snapshots) | ||
if invalid: | ||
# Add this to each file's invalid message list | ||
for f in urls[url]: | ||
if any(bool(s in f) for s in skip): | ||
continue | ||
ret |= 2 | ||
if f in messages: | ||
if invalid not in messages[f]["invalid"]: | ||
messages[f]["invalid"].append(invalid) | ||
else: | ||
messages[f] = {"invalid": [invalid], "newer": []} | ||
|
||
if errors_only: | ||
continue | ||
|
||
if newer: | ||
# Add this to each file's newer message list | ||
for f in urls[url]: | ||
if any(bool(s in f) for s in skip): | ||
continue | ||
ret |= 4 | ||
if f in messages: | ||
if newer not in messages[f]["newer"]: | ||
messages[f]["newer"].append(newer) | ||
else: | ||
messages[f] = {"newer": [newer], "invalid": []} | ||
|
||
# Print the messages for each file | ||
for f in messages: | ||
print(f"{f}:") | ||
for msg in messages[f]["invalid"]: | ||
print(f" ERROR: {msg}") | ||
for msg in messages[f]["newer"]: | ||
print(f" NEWER: {msg}") | ||
|
||
return ret | ||
|
||
|
||
# parse cmdline args | ||
def parse_args(): | ||
parser =argparse.ArgumentParser(description="Check snapshot urls") | ||
parser.add_argument("--verbose") | ||
parser.add_argument("--timeout", type=int, default=SNAPSHOTS_TIMEOUT, | ||
help="How long to wait for rpmrepo snapshot list") | ||
parser.add_argument("--cache", help="Use a cached file for the list of rpmrepo snapshots") | ||
parser.add_argument("--url", default=SNAPSHOTS_URL, | ||
help="URL to use for the list of rpmrepo snapshots") | ||
parser.add_argument("--errors-only", action="store_true", | ||
help="Only return errors") | ||
parser.add_argument("directory") | ||
return parser.parse_args() | ||
|
||
def main(): | ||
args = parse_args() | ||
urls = find_snapshot_urls(args.directory) | ||
|
||
snapshots = None | ||
if args.cache: | ||
try: | ||
with open(args.cache, encoding="utf8") as f: | ||
snapshots = json.load(f) | ||
except: | ||
print(f"No snapshots cache found at {args.cache}") | ||
sys.exit(1) | ||
else: | ||
snapshots = fetch_snapshots_api(args.url, args.timeout) | ||
if not snapshots: | ||
print(f"Cannot download snapshots from {args.url}") | ||
sys.exit(1) | ||
|
||
return check_snapshot_urls(urls, snapshots, errors_only=args.errors_only) | ||
|
||
|
||
if __name__=='__main__': | ||
sys.exit(main()) |