-
Notifications
You must be signed in to change notification settings - Fork 22
/
main.py
119 lines (94 loc) · 4.86 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from pathlib import Path
import argparse
import yaml
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import load_pem_x509_certificate, load_der_x509_certificate
from trust_stores_observatory import __version__
from trust_stores_observatory.certificates_repository import RootCertificatesRepository
from trust_stores_observatory.store_fetcher import TrustStoreFetcher
from trust_stores_observatory.trust_store import PlatformEnum, TrustStore
ROOT_PATH = Path(__file__).parent.resolve()
def import_certificates(folder_with_certs_to_import: Path) -> None:
repo = RootCertificatesRepository(Path("certificates"))
import_errors = []
for cert_path in folder_with_certs_to_import.glob("*"):
if cert_path.name.endswith(".pem") or cert_path.name.endswith(".crt"):
cert_content = cert_path.read_text().encode("ascii")
parsing_function = load_pem_x509_certificate
elif cert_path.name.endswith(".der"):
cert_content = cert_path.read_bytes()
parsing_function = load_der_x509_certificate
else:
print(f"Skipping file {cert_path}.")
continue
try:
parsed_cert = parsing_function(cert_content, default_backend())
except ValueError as e:
import_errors.append(f"Error loading {cert_path}: {e.args[0]}")
continue
new_cert_path = repo.store_certificate(parsed_cert)
print(f"Stored certificate at {new_cert_path}")
if import_errors:
print("\nWARNING: Not all certificates could be imported:")
for error_mgs in import_errors:
print(error_mgs)
def refresh_trust_stores() -> None:
"""Fetch each trust store and update the corresponding local YAML file at ./trust_stores."""
# Also pass the local certs repo so it gets updated when fetching the trust stores
certs_repo = RootCertificatesRepository.get_default()
# For each supported platform, fetch the trust store
store_fetcher = TrustStoreFetcher()
for platform in PlatformEnum:
if platform in [PlatformEnum.ORACLE_JAVA, PlatformEnum.OPENJDK]:
# TODO: Fix this
print(f"Skipping {platform.name}... TODO: Fixme")
continue
print(f"Refreshing {platform.name}...")
fetched_store = store_fetcher.fetch(platform, certs_repo)
# Compare the existing trust store with the one we fetched
has_store_changed = False
store_path = Path(ROOT_PATH) / "trust_stores" / f"{fetched_store.platform.name.lower()}.yaml"
try:
existing_store = TrustStore.from_yaml(store_path)
if existing_store != fetched_store:
has_store_changed = True
except FileNotFoundError:
# The store does not exist in the repo yet
has_store_changed = True
if has_store_changed:
print(f"Detected changes for {platform.name}; updating store...")
with open(store_path, mode="w") as store_file:
yaml.dump(fetched_store, store_file, encoding="utf-8", default_flow_style=False)
else:
print(f"No changes detected for {platform.name}")
def export_trust_stores() -> None:
"""Export the content of the trust store of each supported platform to a PEM file at ./export."""
certs_repo = RootCertificatesRepository.get_default()
out_pem_folder = ROOT_PATH / "export"
out_pem_folder.mkdir(exist_ok=True)
# Export each trust store as a PEM file to ./export
print(f"Exporting stores as PEM to {out_pem_folder}...")
for platform in PlatformEnum:
print(f"Exporting {platform.name}...")
store = TrustStore.get_default_for_platform(platform)
all_certs_pem = store.export_trusted_certificates_as_pem(certs_repo)
out_pem_path = out_pem_folder / f"{platform.name.lower()}.pem"
with open(out_pem_path, mode="w") as out_pem_file:
out_pem_file.write(all_certs_pem)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Trust Store Observatory CLI.")
parser.add_argument("--version", action="version", version=__version__)
parser.add_argument("--import_certificates", action="store", help=str(import_certificates.__doc__))
parser.add_argument("--export", action="store_true", help=str(export_trust_stores.__doc__))
parser.add_argument("--refresh", action="store_true", help=str(refresh_trust_stores.__doc__))
args = parser.parse_args()
if (args.export and args.import_certificates) or (args.refresh and args.import_certificates):
raise ValueError("Cannot combine --import_certificate with other options.")
if args.import_certificates:
certs_folder = Path(args.import_certificates)
import_certificates(certs_folder)
# Always refresh before exporting
if args.refresh:
refresh_trust_stores()
if args.export:
export_trust_stores()