-
Notifications
You must be signed in to change notification settings - Fork 70
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
13 changed files
with
2,147 additions
and
5 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,210 @@ | ||
import logging | ||
from abc import ABC | ||
from dataclasses import dataclass | ||
|
||
from fontTools.ttLib import TTFont # type: ignore | ||
from gftools.designers_pb2 import DesignerInfoProto | ||
from gftools.fonts_public_pb2 import FamilyProto | ||
from gftools.push.utils import google_path_to_repo_path | ||
from gftools.util.google_fonts import ReadProto | ||
from gftools.utils import font_version, download_family_from_Google_Fonts, PROD_FAMILY_DOWNLOAD | ||
import zipfile | ||
from bs4 import BeautifulSoup # type: ignore | ||
from pathlib import Path | ||
from axisregistry.axes_pb2 import AxisProto | ||
from google.protobuf.json_format import MessageToDict # type: ignore | ||
from typing import Optional | ||
|
||
|
||
log = logging.getLogger("gftools.items") | ||
|
||
|
||
def jsonify(item): | ||
if item == None: | ||
return item | ||
if isinstance(item, (bool, int, float, str)): | ||
return item | ||
elif isinstance(item, dict): | ||
return {k: jsonify(v) for k, v in item.items()} | ||
elif isinstance(item, (tuple, list)): | ||
return [jsonify(i) for i in item] | ||
if hasattr(item, "to_json"): | ||
return item.to_json() | ||
return item | ||
|
||
|
||
class Itemer(ABC): | ||
def to_json(self): | ||
return jsonify(self.__dict__) | ||
|
||
|
||
@dataclass | ||
class Family(Itemer): | ||
name: str | ||
version: str | ||
|
||
@classmethod | ||
def from_ttfont(cls, fp: str | Path): | ||
ttFont = TTFont(fp) | ||
name = ttFont["name"].getBestFamilyName() | ||
version = font_version(ttFont) | ||
return cls(name, version) | ||
|
||
@classmethod | ||
def from_fp(cls, fp: str | Path): | ||
ttf = list(fp.glob("*.ttf"))[0] # type: ignore | ||
return cls.from_ttfont(ttf) | ||
|
||
@classmethod | ||
def from_gf_json(cls, data, dl_url: str=PROD_FAMILY_DOWNLOAD): | ||
return cls.from_gf(data["family"], dl_url) | ||
|
||
@classmethod | ||
def from_gf(cls, name: str, dl_url: str=PROD_FAMILY_DOWNLOAD): | ||
try: | ||
fonts = download_family_from_Google_Fonts(name, dl_url=dl_url) | ||
ttFont = TTFont(fonts[0]) | ||
version = font_version(ttFont) | ||
name = ttFont["name"].getBestFamilyName() | ||
return cls(name, version) | ||
except zipfile.BadZipFile: | ||
return None | ||
|
||
|
||
@dataclass | ||
class AxisFallback(Itemer): | ||
name: str | ||
value: float | ||
|
||
|
||
@dataclass | ||
class Axis(Itemer): | ||
tag: str | ||
display_name: str | ||
min_value: float | ||
default_value: float | ||
max_value: float | ||
precision: float | ||
fallback: list[AxisFallback] | ||
fallback_only: bool | ||
description: str | ||
|
||
@classmethod | ||
def from_gf_json(cls, axis): | ||
return cls( | ||
tag=axis["tag"], | ||
display_name=axis["displayName"], | ||
min_value=axis["min"], | ||
default_value=axis["defaultValue"], | ||
max_value=axis["max"], | ||
precision=axis["precision"], | ||
fallback=[ | ||
AxisFallback(name=f["name"], value=f["value"]) | ||
for f in axis["fallbacks"] | ||
], | ||
fallback_only=axis["fallbackOnly"], | ||
description=axis["description"], | ||
) | ||
|
||
@classmethod | ||
def from_fp(cls, fp: Path): | ||
log.info("Getting axis data") | ||
|
||
fp = google_path_to_repo_path(fp) | ||
data = MessageToDict(ReadProto(AxisProto(), fp)) | ||
|
||
return cls( | ||
tag=data["tag"], | ||
display_name=data["displayName"], | ||
min_value=data["minValue"], | ||
default_value=data["defaultValue"], | ||
max_value=data["maxValue"], | ||
precision=data["precision"], | ||
fallback=[ | ||
AxisFallback(name=f["name"], value=f["value"]) | ||
for f in data["fallback"] | ||
], | ||
fallback_only=data["fallbackOnly"], | ||
description=data["description"] | ||
) | ||
|
||
def to_json(self): | ||
d = self.__dict__ | ||
d["fallback"] = [f.__dict__ for f in self.fallback] | ||
return d | ||
|
||
|
||
@dataclass | ||
class FamilyMeta(Itemer): | ||
name: str | ||
designer: list[str] | ||
license: str | ||
category: str | ||
subsets: list[str] | ||
stroke: str | ||
classifications: list[str] | ||
description: str | ||
primary_script: Optional[str] = None | ||
|
||
@classmethod | ||
def from_fp(cls, fp: Path): | ||
meta_fp = fp / "METADATA.pb" | ||
data = ReadProto(FamilyProto(), meta_fp) | ||
description = open(fp / "DESCRIPTION.en_us.html").read() | ||
stroke = data.category[0] if not data.stroke else data.stroke.replace(" ", "_").upper() | ||
return cls( | ||
name=data.name, | ||
designer=data.designer.split(","), | ||
license=data.license.lower(), | ||
category=data.category[0], | ||
subsets=sorted([s for s in data.subsets if s != "menu"]), | ||
stroke=stroke, | ||
classifications=[c.lower() for c in data.classifications], | ||
description=parse_html(description), | ||
primary_script=None if data.primary_script == "" else data.primary_script | ||
) | ||
|
||
@classmethod | ||
def from_gf_json(cls, meta): | ||
stroke = ( | ||
None if meta["stroke"] == None else meta["stroke"].replace(" ", "_").upper() | ||
) | ||
return cls( | ||
name=meta["family"], | ||
designer=[i["name"] for i in meta["designers"]], | ||
license=meta["license"].lower(), | ||
category=meta["category"].replace(" ", "_").upper(), | ||
subsets=sorted(list(meta["coverage"].keys())), | ||
stroke=stroke, | ||
classifications=[c.lower() for c in meta["classifications"]], | ||
description=parse_html(meta["description"]), | ||
primary_script=None if meta["primaryScript"] == "" else meta["primaryScript"] | ||
) | ||
|
||
|
||
def parse_html(string: str): | ||
return BeautifulSoup(string.replace("\n", " ").replace(" ", " "), features="lxml").prettify().strip() | ||
|
||
|
||
@dataclass | ||
class Designer(Itemer): | ||
name: str | ||
bio: str | ||
|
||
@classmethod | ||
def from_gf_json(cls, data): | ||
return cls(data["name"], data["bio"]) | ||
|
||
@classmethod | ||
def from_fp(cls, fp): | ||
meta = ReadProto(DesignerInfoProto(), fp / "info.pb") | ||
name = meta.designer | ||
bio_fp = fp / "bio.html" | ||
if not bio_fp.exists(): | ||
return cls(name, None) | ||
with open(bio_fp) as doc: | ||
bio = doc.read() | ||
return cls(name, bio) | ||
|
||
|
||
Items = Axis | Designer | Family | FamilyMeta |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
import json | ||
import logging | ||
import os | ||
from configparser import ConfigParser | ||
from datetime import datetime | ||
from functools import cache | ||
from pathlib import Path | ||
|
||
import requests # type: ignore | ||
from gftools.push.items import Axis, AxisFallback, Designer, Family, FamilyMeta, Itemer, Items | ||
from gftools.utils import ( | ||
PROD_FAMILY_DOWNLOAD, | ||
) | ||
|
||
log = logging.getLogger("gftools.servers") | ||
|
||
|
||
|
||
# This module uses api endpoints which shouldn't be public. Ask | ||
# Marc Foley for the .gf_push_config.ini file. Place this file in your | ||
# home directory. Environment variables can also be used instead. | ||
config = ConfigParser() | ||
config.read(os.path.join(os.path.expanduser("~"), ".gf_push_config.ini")) | ||
|
||
SANDBOX_META_URL = os.environ.get("SANDBOX_META_URL") or config["urls"]["sandbox_meta"] | ||
PRODUCTION_META_URL = ( | ||
os.environ.get("PRODUCTION_META_URL") or config["urls"]["production_meta"] | ||
) | ||
DEV_META_URL = os.environ.get("DEV_META_URL") or config["urls"]["dev_meta"] | ||
SANDBOX_FAMILY_DOWNLOAD = ( | ||
os.environ.get("SANDBOX_FAMILY_DOWNLOAD") | ||
or config["urls"]["sandbox_family_download"] | ||
) | ||
DEV_FAMILY_DOWNLOAD = ( | ||
os.environ.get("DEV_FAMILY_DOWNLOAD") or config["urls"]["dev_family_download"] | ||
) | ||
|
||
|
||
@cache | ||
def gf_server_metadata(url: str): | ||
"""Get family json data from a Google Fonts metadata url""" | ||
# can't do requests.get("url").json() since request text starts with ")]}'" | ||
info = requests.get(url).json() | ||
|
||
return {i["family"]: i for i in info["familyMetadataList"]} | ||
|
||
|
||
@cache | ||
def gf_server_family_metadata(url: str, family: str): | ||
"""Get metadata for family on a server""" | ||
# can't do requests.get("url").json() since request text starts with ")]}'" | ||
url = url + f"/{family.replace(' ', '%20')}" | ||
r = requests.get(url) | ||
if r.status_code != 200: | ||
return None | ||
text = r.text | ||
info = json.loads(text[4:]) | ||
return info | ||
|
||
|
||
class GFServer(Itemer): | ||
def __init__(self, name: str, url: str=PRODUCTION_META_URL, dl_url: str=PROD_FAMILY_DOWNLOAD): | ||
self.name = name | ||
self.url = url | ||
self.dl_url = dl_url | ||
self.families: dict[str, Family] = {} | ||
self.designers: dict[str, Designer] = {} | ||
self.metadata: dict[str, FamilyMeta] = {} | ||
self.axisregistry: dict[str, Axis] = {} | ||
|
||
def compare_push_item(self, item: Items): | ||
server_item = self.find_item(item) | ||
return server_item == item | ||
|
||
def find_item(self, item): | ||
if isinstance(item, Family): | ||
server_item = self.families.get(item.name) | ||
elif isinstance(item, Designer): | ||
server_item = self.designers.get(item.name) | ||
elif isinstance(item, Axis): | ||
server_item = self.axisregistry.get(item.tag) | ||
elif isinstance(item, FamilyMeta): | ||
server_item = self.metadata.get(item.name) | ||
else: | ||
return None | ||
return server_item | ||
|
||
def update_axis_registry(self, axis_data): | ||
for axis in axis_data: | ||
self.axisregistry[axis["tag"]] = Axis.from_gf_json(axis) | ||
|
||
def update_family(self, name: str): | ||
family = Family.from_gf(name, dl_url=self.dl_url) | ||
if family: | ||
self.families[name] = family | ||
return True | ||
return False | ||
|
||
def update_family_designers(self, name: str): | ||
meta = gf_server_family_metadata(self.url, name) | ||
for designer in meta["designers"]: | ||
self.designers[designer["name"]] = Designer.from_gf_json(designer) | ||
|
||
def update_metadata(self, name: str): | ||
meta = gf_server_family_metadata(self.url, name) | ||
self.metadata[meta["family"]] = FamilyMeta.from_gf_json(meta) | ||
|
||
def update(self, last_checked: str): | ||
meta = requests.get(self.url).json() | ||
self.update_axis_registry(meta["axisRegistry"]) | ||
|
||
families_data = meta["familyMetadataList"] | ||
for family_data in families_data: | ||
family_name = family_data["family"] | ||
last_modified = family_data["lastModified"] | ||
if last_modified > last_checked: | ||
log.info(f"Updating {family_name}") | ||
if self.update_family(family_name): | ||
self.update_family_designers(family_name) | ||
self.update_metadata(family_name) | ||
|
||
|
||
class GFServers(Itemer): | ||
|
||
DEV = "dev" | ||
SANDBOX = "sandbox" | ||
PRODUCTION = "production" | ||
SERVERS = (DEV, SANDBOX, PRODUCTION) | ||
|
||
def __init__(self): | ||
self.last_checked = datetime.fromordinal(1).isoformat().split("T")[0] | ||
self.dev = GFServer(GFServers.DEV, DEV_META_URL, DEV_FAMILY_DOWNLOAD) | ||
self.sandbox = GFServer(GFServers.SANDBOX, SANDBOX_META_URL, SANDBOX_FAMILY_DOWNLOAD) | ||
self.production = GFServer( | ||
GFServers.PRODUCTION, PRODUCTION_META_URL, PROD_FAMILY_DOWNLOAD | ||
) | ||
|
||
def __iter__(self): | ||
for server in GFServers.SERVERS: | ||
yield getattr(self, server) | ||
|
||
def update(self): | ||
for server in self: | ||
server.update(self.last_checked) | ||
self.last_checked = datetime.now().isoformat().split("T")[0] | ||
|
||
def compare_item(self, item: Items): | ||
res = item.to_json() | ||
for server in self: | ||
res[f"In {server.name}"] = server.compare_push_item(item) | ||
return res | ||
|
||
def save(self, fp: str | Path): | ||
data = self.to_json() | ||
json.dump(data, open(fp, "w"), indent=4) | ||
|
||
@classmethod | ||
def open(cls, fp: str | Path): | ||
data = json.load(open(fp)) | ||
return cls.from_dict(data) | ||
|
||
@classmethod | ||
def from_dict(cls, data): | ||
inst = cls() | ||
inst.last_checked = data["last_checked"] | ||
for server_name in GFServers.SERVERS: | ||
server = getattr(inst, server_name) | ||
|
||
for item_type, item_value in data[server_name].items(): | ||
if item_type == "families": | ||
server.families = {k: Family(**v) for k, v in item_value.items()} | ||
elif item_type == "designers": | ||
server.designers = {k: Designer(**v) for k, v in item_value.items()} | ||
elif item_type == "metadata": | ||
server.metadata = { | ||
k: FamilyMeta(**v) for k, v in item_value.items() | ||
} | ||
elif item_type == "axisregistry": | ||
server.axisregistry = {k: Axis(**v) for k, v in item_value.items()} | ||
for _, v in server.axisregistry.items(): | ||
v.fallback = [AxisFallback(**a) for a in v.fallback] | ||
else: | ||
setattr(server, item_type, item_value) | ||
return inst |
Oops, something went wrong.