Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate JSON schema #63

Merged
merged 3 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
325 changes: 311 additions & 14 deletions _extensions/cps.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,37 @@
from sphinx import domains
from sphinx import addnodes
import os
import re
from dataclasses import dataclass
from typing import cast

from docutils import nodes
from docutils.parsers.rst import directives, roles
from docutils.parsers.rst import Directive, directives, roles
from docutils.transforms import Transform

import re
from jsb import JsonSchema

from sphinx import addnodes, domains
from sphinx.util import logging
from sphinx.util.nodes import clean_astext

logger = logging.getLogger(__name__)

#==============================================================================
# =============================================================================
def simplify_text(node):
paragraphs = []
for p in clean_astext(node).split('\n\n'):
paragraphs.append(p.replace('\n', ' '))

return '\n\n'.join(paragraphs)

# =============================================================================
def make_code(text, classname):
return nodes.literal(text, text, classes=['code', classname])

# =============================================================================
class InternalizeLinks(Transform):
default_priority = 200

#--------------------------------------------------------------------------
# -------------------------------------------------------------------------
def is_internal_link(self, refuri):
if not refuri:
return True
Expand All @@ -21,7 +41,7 @@ def is_internal_link(self, refuri):

return True

#--------------------------------------------------------------------------
# -------------------------------------------------------------------------
def apply(self, **kwargs):
for ref in self.document.findall(nodes.reference):
# Skip inter-document links
Expand Down Expand Up @@ -56,14 +76,223 @@ def apply(self, **kwargs):
# Replace the old node
ref.replace_self(xref)

#==============================================================================
# =============================================================================
class ObjectDirective(Directive):
required_arguments = 1
optional_arguments = 0
has_content = True

# -------------------------------------------------------------------------
def run(self):
name = self.arguments[0]

# Create section node
section = nodes.section()
section.document = self.state.document
section.line = self.lineno
section['names'].append(f'{name}(object)')

# Create text nodes for section title
title = [
make_code(name, 'object'),
nodes.inline('(object)', '(object)', classes=['hidden']),
]
section += nodes.title(name, '', *title)

# Parse object description
content = nodes.Element()
self.state.nested_parse(self.content, self.content_offset, content)
section += content.children

# Record section reference
self.state.document.note_implicit_target(section, section)

# Record object on domain
env = self.state.document.settings.env
domain = cast(CpsDomain, env.get_domain('cps'))
domain.note_object(name, simplify_text(content))

# Return generated nodes
return [section]

# =============================================================================
class AttributeDirective(Directive):
required_arguments = 1
optional_arguments = 0
has_content = True
option_spec = {
'type': directives.unchanged,
'context': lambda a: a.split(),
'overload': directives.flag,
'required': directives.flag,
'conditionally-required': directives.flag,
}

# -------------------------------------------------------------------------
def make_field(self, name, rawtext, body):
src, srcline = self.state.state_machine.get_source_and_line()

field = nodes.field()
field.source = src
field.line = srcline

field += nodes.field_name(name, name)
field += nodes.field_body(rawtext, *body)

return field

# -------------------------------------------------------------------------
def make_list(self, values, nodetype, classes):
content = [nodetype(values[0], values[0], classes=classes)]
for value in values[1:]:
content += [
nodes.Text(', '),
nodetype(value, value, classes=classes),
]
return content

# -------------------------------------------------------------------------
def parse_type(self, typedesc):
if '|' in typedesc:
types = typedesc.split('|')
content = self.parse_type(types[0])
for t in types[1:]:
content += [
nodes.Text(' '),
nodes.inline('or', 'or', classes=['separator']),
nodes.Text(' '),
] + self.parse_type(t)

return content

m = re.match(r'^(list|map)[(](.*)[)]$', typedesc)
if m:
outer, inner = m.groups()
content = [
make_code(outer, 'type'),
nodes.Text(' of '),
]
if outer == 'map':
content += [
make_code('string', 'type'),
nodes.Text(' to '),
]

return content + self.parse_type(inner)

elif typedesc in {'string'}:
return [make_code(typedesc, 'type')]

else:
return [make_code(typedesc, 'object')]

# -------------------------------------------------------------------------
def run(self):
name = self.arguments[0]
typedesc = self.options['type']
context = self.options['context']
overload = 'overload' in self.options
required = 'required' in self.options
conditionally_required = 'conditionally-required' in self.options

if overload:
target = f'{name} ({context[0]})'
else:
target = name

if required:
required_text = 'Yes'
elif conditionally_required:
required_text = 'Special'
else:
required_text = 'No'

# Create section node
section = nodes.section()
section.document = self.state.document
section.line = self.lineno
section['names'].append(target)

# Create text nodes for section title
title = [make_code(name, 'attribute')]
if overload:
title += [
nodes.Text(' '),
nodes.inline(f'({context[0]})', f'({context[0]})',
classes=['applies-to']),
]
section += nodes.title(target, '', *title)

# Create nodes for attribute information
fields = nodes.field_list()
fields += self.make_field(
'Type', typedesc, self.parse_type(typedesc))
fields += self.make_field(
'Applies To', ', '.join(context),
self.make_list(context, nodes.literal, ['code', 'object']))
fields += self.make_field(
'Required', required_text, [nodes.Text(required_text)])
section += fields

# Parse attribute description
content = nodes.Element()
self.state.nested_parse(self.content, self.content_offset, content)
section += content.children

# Record section reference
self.state.document.note_implicit_target(section, section)

# Record object on domain
env = self.state.document.settings.env
domain = cast(CpsDomain, env.get_domain('cps'))
domain.note_attribute(name, context, typedesc, required=required,
description=simplify_text(content), node=section)

# Return generated nodes
return [section]

# =============================================================================
@dataclass
class Attribute:
typedesc: str
description: str
required: bool

# =============================================================================
class AttributeSet:

# -------------------------------------------------------------------------
def __init__(self, name, context, attribute, node):
self.name = name
self.instances = [attribute]
self.context = {c: (0, node) for c in context}

# -------------------------------------------------------------------------
def overload(self, context, attribute, node):
i = len(self.instances)
self.instances.append(attribute)
for c in context:
if c in self.context:
logger.warning('duplicate declaration of attribute '
f'{self.name!r} on object {c!r}',
location=node)
logger.warning(f'{self.name!r} was previously declared here',
location=self.context[c][1])
else:
self.context[c] = (i, node)

# =============================================================================
class CpsDomain(domains.Domain):
name = 'cps'

#--------------------------------------------------------------------------
# -------------------------------------------------------------------------
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

# Site-specific directives (used for JSON schema generation)
self.directives['object'] = ObjectDirective
self.directives['attribute'] = AttributeDirective

# Site-specific custom roles (these just apply styling)
self.add_role('hidden')
self.add_role('applies-to')
Expand All @@ -81,24 +310,92 @@ def __init__(self, *args, **kwargs):
self.add_code_role('var')
self.add_code_role('env')

#--------------------------------------------------------------------------
# -------------------------------------------------------------------------
@property
def objects(self):
return self.data.setdefault('objects', {})

# -------------------------------------------------------------------------
@property
def attributes(self):
return self.data.setdefault('attributes', {})

# -------------------------------------------------------------------------
def note_object(self, name, description):
if name not in self.objects:
self.objects[name] = description

# -------------------------------------------------------------------------
def note_attribute(self, name, context, typedesc,
required, description, node):
a = Attribute(typedesc, description, required)
if name not in self.attributes:
self.attributes[name] = AttributeSet(name, context, a, node)
else:
self.attributes[name].overload(context, a, node)

# -------------------------------------------------------------------------
def add_role(self, name, styles=None, parent=roles.generic_custom_role):
options = {}
if styles is None:
styles=name
styles = name
else:
styles=' '.join([name] + styles)
styles = ' '.join([name] + styles)

options['class'] = directives.class_option(styles)
self.roles[name] = roles.CustomRole(name, parent, options)

#--------------------------------------------------------------------------
# -------------------------------------------------------------------------
def add_code_role(self, name, styles=None, parent=roles.code_role):
self.add_role(name, styles, parent)

#==============================================================================

# =============================================================================
def write_schema(app, exception):
if exception is not None:
return

config = app.env.config
title = f'{config.project} v{config.version}'

domain = cast(CpsDomain, app.env.get_domain('cps'))
schema = JsonSchema(title, config.schema_id)

object_attributes = {}
for attribute_set in domain.attributes.values():
for i, attribute in enumerate(attribute_set.instances):
schema.add_attribute(
attribute_set.name, i,
attribute.typedesc,
attribute.description,
)

for context, attribute_ref in attribute_set.context.items():
attribute = (
attribute_set.name,
attribute_ref[0],
attribute_set.instances[attribute_ref[0]].required
)
if context in object_attributes:
object_attributes[context].append(attribute)
else:
object_attributes[context] = [attribute]

for name, description in domain.objects.items():
schema.add_object_type(name, description, object_attributes[name])

output_path = os.path.join(app.outdir, config.schema_filename)
schema.write(config.schema_root_object, output_path)

# =============================================================================
def setup(app):
app.add_domain(CpsDomain)
app.add_config_value('schema_id', '', '', [str])
app.add_config_value('schema_filename', 'schema.json', '', [str])
app.add_config_value('schema_root_object', None, '', [str])

# Add custom transform to resolve cross-file references
app.add_transform(InternalizeLinks)

# Add hook to write machine-readable schema description on completion
app.connect('build-finished', write_schema)
Loading