diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ebbd284b2..982895371 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,11 +1,23 @@ repos: - repo: https://github.com/psf/black - rev: 22.6.0 + rev: 23.3.0 hooks: - id: black + language_version: python3 + args: [--line-length=142] + + - repo: https://github.com/PyCQA/flake8 + rev: 6.0.0 + hooks: + - id: flake8 + additional_dependencies: [flake8-docstrings] + args: [--max-line-length=142] + types: ['python'] - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.3.0 + rev: v4.4.0 hooks: - - id: trailing-whitespace + - id: check-yaml + - id: end-of-file-fixer + - id: trailing-whitespace diff --git a/src/presenters/managers/presenters_manager.py b/src/presenters/managers/presenters_manager.py index ea5acd19b..26af45235 100644 --- a/src/presenters/managers/presenters_manager.py +++ b/src/presenters/managers/presenters_manager.py @@ -1,24 +1,42 @@ +"""Manager of all presenters. + +Returns: + _description_ +""" from presenters.pdf_presenter import PDFPresenter from presenters.html_presenter import HTMLPresenter from presenters.text_presenter import TEXTPresenter from presenters.misp_presenter import MISPPresenter +from presenters.message_presenter import MESSAGEPresenter from shared.schema.presenter import PresenterInputSchema, PresenterOutputSchema presenters = {} def initialize(): + """Initialize all presenters.""" register_presenter(PDFPresenter()) register_presenter(HTMLPresenter()) register_presenter(TEXTPresenter()) register_presenter(MISPPresenter()) + register_presenter(MESSAGEPresenter()) def register_presenter(presenter): + """Register a presenter. + + Arguments: + presenter -- Presenter module + """ presenters[presenter.type] = presenter def get_registered_presenters_info(): + """Get info about all presenters. + + Returns: + List with presenter type as key and info as value + """ presenters_info = [] for key in presenters: presenters_info.append(presenters[key].get_info()) @@ -27,6 +45,14 @@ def get_registered_presenters_info(): def generate(presenter_input_json): + """Generate. + + Arguments: + presenter_input_json -- JSON + + Returns: + _description_ + """ presenter_input_schema = PresenterInputSchema() presenter_input = presenter_input_schema.load(presenter_input_json) diff --git a/src/presenters/presenters/base_presenter.py b/src/presenters/presenters/base_presenter.py index 067c95511..a1f00da22 100644 --- a/src/presenters/presenters/base_presenter.py +++ b/src/presenters/presenters/base_presenter.py @@ -1,6 +1,10 @@ from shared.schema.presenter import PresenterSchema from managers import log_manager -import json, datetime, types +import json +import datetime +import types +import re + class BasePresenter: type = "BASE_PRESENTER" @@ -18,14 +22,17 @@ def json_default(value): return dict(value) else: return value.__dict__ + # helper class class AttributesObject: def toJSON(self): return json.dumps(self, default=BasePresenter.json_default, sort_keys=True, indent=4) + # helper class class ReportItemObject: def toJSON(self): return json.dumps(self, default=BasePresenter.json_default, sort_keys=True, indent=4) + def __init__(self, report_item, report_types, attribute_map): # report item itself self.name = report_item.title @@ -69,6 +76,41 @@ class InputDataObject: def toJSON(self): return json.dumps(self, default=BasePresenter.json_default, sort_keys=True, indent=4) + def get_max_tlp(self, reports): + """Returns the highest TLP value from a list of reports + + Args: + reports (list): list of reports + + Returns: + max_tlp: Highest TLP value from the list of reports + """ + color_values = { + 'WHITE': 0, + 'CLEAR': 1, + 'GREEN': 2, + 'AMBER': 3, + 'AMBER+STRICT': 4, + 'RED': 5 + } + colors = [] + + for report in reports: + if report.type == "Vulnerability Report": + colors.append(report.attrs.tlp) + + max_tlp = max(colors, key=lambda color: color_values.get(color, 0)) + if not max_tlp: + max_tlp = "CLEAR" + return max_tlp + + def add_link_prefix(self, report, letter): + pattern = r'\[(\d+)\]' + description = re.sub(pattern, lambda match: f"[{letter}{match.group(1)}]", report.attrs.description) + recommendations = re.sub(pattern, lambda match: f"[{letter}{match.group(1)}]", report.attrs.recommendations) + + return description, recommendations + def __init__(self, presenter_input): # types of report items (e.g. vuln report, disinfo report) report_types = dict() @@ -85,9 +127,21 @@ def __init__(self, presenter_input): self.product = presenter_input.product self.report_items = list() + for report in presenter_input.reports: self.report_items.append(BasePresenter.ReportItemObject(report, report_types, attribute_map)) + letter = 'A' + vul_report_count = 0 + for report in self.report_items: + if report.type == "Vulnerability Report": + report.attrs.description, report.attrs.recommendations = self.add_link_prefix(report, letter) + report.attrs.link_prefix = letter + letter = chr(ord(letter) + 1) + vul_report_count += 1 + if vul_report_count > 0: + self.product.max_tlp = self.get_max_tlp(self.report_items) + def get_info(self): info_schema = PresenterSchema() return info_schema.dump(self) diff --git a/src/presenters/presenters/message_presenter.py b/src/presenters/presenters/message_presenter.py new file mode 100644 index 000000000..23a61403e --- /dev/null +++ b/src/presenters/presenters/message_presenter.py @@ -0,0 +1,70 @@ +"""Create a message presenter. + +Returns: + _description_ +""" +import os +from base64 import b64encode +import jinja2 + +from .base_presenter import BasePresenter +from shared.schema.parameter import Parameter, ParameterType + + +class MESSAGEPresenter(BasePresenter): + """Class for MESSAGE presenter. + + Arguments: + BasePresenter -- Superclass + + Returns: + _description_ + """ + + type = "MESSAGE_PRESENTER" + name = "MESSAGE Presenter" + description = "Presenter for generating message title and body" + + parameters = [ + Parameter(0, "TITLE_TEMPLATE_PATH", "Title template", "Path of message title template file", ParameterType.STRING), + Parameter(0, "BODY_TEMPLATE_PATH", "Body template", "Path to message body template file", ParameterType.STRING), + ] + + parameters.extend(BasePresenter.parameters) + + def generate(self, presenter_input): + """Generate message parts from Jinja templates. + + Arguments: + presenter_input -- Input data for templating + + Returns: + presenter_output -- dict with keys mime_type and data with message parts as subkeys + """ + message_title_template_path = presenter_input.parameter_values_map["TITLE_TEMPLATE_PATH"] + message_body_template_path = presenter_input.parameter_values_map["BODY_TEMPLATE_PATH"] + presenter_output = {"mime_type": "text/plain", "message_title": None, "message_body": None, "data": None} + + def generate_part(template_path): + head, tail = os.path.split(template_path) + input_data = BasePresenter.generate_input_data(presenter_input) + env = jinja2.Environment(loader=jinja2.FileSystemLoader(head)) + func_dict = { + "vars": vars, + } + template = env.get_template(tail) + template.globals.update(func_dict) + output_text = template.render(data=input_data).encode() + base64_bytes = b64encode(output_text) + data = base64_bytes.decode("UTF-8") + return data + + try: + presenter_output["message_title"] = generate_part(message_title_template_path) + presenter_output["message_body"] = generate_part(message_body_template_path) + return presenter_output + + except Exception as error: + BasePresenter.print_exception(self, error) + presenter_output = {"mime_type": "text/plain", "data": b64encode(("TEMPLATING ERROR\n" + str(error)).encode()).decode("UTF-8")} + return presenter_output diff --git a/src/presenters/templates/email_body_template.txt b/src/presenters/templates/email_body_template.txt new file mode 100644 index 000000000..2a07bb265 --- /dev/null +++ b/src/presenters/templates/email_body_template.txt @@ -0,0 +1,25 @@ +Hi, + +{{ data.product.description }} +{% for report_item in data.report_items %}{% if report_item.type != "Vulnerability Report" %}This template cannot be used for item of type "{{ report_item.type }}". It can only handle "Vulnerability Report".{% else %} + +{{ report_item.name }} + +{{ report_item.attrs.description }} + +{% if report_item.attrs.recommendations %} +### Recommendations + +{{ report_item.attrs.recommendations }} + +{% endif %} +### CVE: {% for cve in report_item.attrs.cve %}{{ cve }}{{ ", " if not loop.last else "" }}{% endfor %} +{% if report_item.attrs.links %} +### Links +{% for entry in report_item.attrs.links %} +[{{ report_item.attrs.link_prefix }}{{ loop.index }}] - {{ entry }} +{% endfor %}{% endif %}{% endif %}{% endfor %} +-- +{{ data.product.user.name }} +Cyber Security Team +Acme Corporation diff --git a/src/presenters/templates/email_subject_template.txt b/src/presenters/templates/email_subject_template.txt new file mode 100644 index 000000000..42bdaa589 --- /dev/null +++ b/src/presenters/templates/email_subject_template.txt @@ -0,0 +1 @@ +[TLP: {{ data.product.max_tlp }}] {{ data.product.title }} diff --git a/src/shared/shared/schema/presenter.py b/src/shared/shared/schema/presenter.py index 04657dc7e..2727de543 100644 --- a/src/shared/shared/schema/presenter.py +++ b/src/shared/shared/schema/presenter.py @@ -1,31 +1,74 @@ +"""Schema for presenters. + +Returns: + _type_: _description_ +""" from marshmallow import Schema, fields, post_load from shared.schema.parameter import ParameterSchema from shared.schema.parameter_value import ParameterValueSchema from shared.schema.report_item import ReportItemSchema from shared.schema.report_item_type import ReportItemTypeSchema +from shared.schema.user import UserSchemaBase class PresenterSchema(Schema): + """Schema for presenter. + + Args: + Schema (_type_): Base schema class. + """ + id = fields.Str() type = fields.Str() name = fields.Str() description = fields.Str() parameters = fields.List(fields.Nested(ParameterSchema)) -# schema for "presenter input product" - a dumbed down product suitable for presenters + class PresenterInputProductSchema(Schema): + """Schema for "presenter input product". + + A dumbed down product suitable for presenters. + + Args: + Schema (_type_): Base schema class. + + Returns: + _type_: _description_ + """ + title = fields.Str() description = fields.Str() product_type = fields.Str() product_type_description = fields.Str() + user = fields.Nested(UserSchemaBase, exclude=("password",)) @post_load def make(self, data, **kwargs): + """Make a PresenterInputProduct object from JSON data. + + Args: + data (_type_): JSON data. + + Returns: + _type_: PresenterInputProduct object. + """ return PresenterInputProduct(**data) -# schema for "presenter input" - a complex package of data for presenters + class PresenterInputSchema(Schema): + """Schema for "presenter input". + + A complex package of data for presenters. + + Args: + Schema (_type_): Base schema class. + + Returns: + _type_: _description_ + """ + type = fields.Str() parameter_values = fields.List(fields.Nested(ParameterValueSchema)) reports = fields.List(fields.Nested(ReportItemSchema)) @@ -34,55 +77,90 @@ class PresenterInputSchema(Schema): @post_load def make(self, data, **kwargs): + """Make a PresenterInput object from JSON data. + + Args: + data (_type_): JSON data. + + Returns: + _type_: PresenterInput object. + """ return PresenterInput(**data) -# real data holding object presented by PresenterInputProductSchema + class PresenterInputProduct: - def __init__(self, title, description, product_type, product_type_description): + """Real data holding object presented by PresenterInputProductSchema.""" + + def __init__(self, title, description, product_type, product_type_description, user): + """Initialize the "presenter input product". + + Args: + title (_type_): Title of the product. + description (_type_): Description of the product. + product_type (_type_): Basicaly name of the product. + product_type_description (_type_): Product description. + user (_type_): Data about the user who created the product. + """ self.title = title self.description = description self.product_type = product_type self.product_type_description = product_type_description + self.user = user @classmethod def make_from_product(cls, product): + """Make a PresenterInputProduct object from a Product object. + + Args: + product (_type_): Product object. + + Returns: + _type_: PresenterInputProduct object. + """ return PresenterInputProduct( - product.title, - product.description, - product.product_type.title, - product.product_type.description + product.title, product.description, product.product_type.title, product.product_type.description, product.user ) + class PresenterInput: - def __init__(self, type, product, parameter_values = None, reports = None, report_types = None): - # creating from JSON data + """Class for "presenter input".""" + + def __init__(self, type, product, parameter_values=None, reports=None, report_types=None): + """Initialize the "presenter input". + + Args: + type (_type_): Which presenter to use (e.g. PDF presenter) + product (_type_): Product consisting of report items. + parameter_values (_type_, optional): Arguments for the presenter (e.g. template path). Defaults to None. + reports (_type_, optional): Report items themselves. Defaults to None. + report_types (_type_, optional): Description of the report item types used. Defaults to None. + """ + # Creating from JSON data. if parameter_values is not None: self.load(type, product, parameter_values, reports, report_types) return - # creating from OBJECTS - - # PRESENTER - # - which presenter to use (e.g. PDF presenter) + # Creating from OBJECTS self.type = type - # - arguments for the presenter (e.g. template path) self.parameter_values = product.product_type.parameter_values - - # REPORT ITEMS - # - report items themselves self.reports = product.report_items - # - description of the report item types used report_types = {} for report in self.reports: report_types[report.report_item_type.id] = report.report_item_type self.report_types = list(report_types.values()) - - # PRODUCT self.product = PresenterInputProduct.make_from_product(product) def load(self, type, product, parameter_values, reports, report_types): + """Load data from JSON. + + Args: + type (_type_): _description_ + product (_type_): _description_ + parameter_values (_type_): The same arguments, parsed differently. + reports (_type_): _description_ + report_types (_type_): _description_ + """ self.type = type self.parameter_values = parameter_values - # - the same arguments, parsed differently self.parameter_values_map = dict() for parameter_value in self.parameter_values: self.parameter_values_map.update({parameter_value.parameter.key: parameter_value.value}) @@ -92,15 +170,46 @@ def load(self, type, product, parameter_values, reports, report_types): class PresenterOutputSchema(Schema): + """Schema for "presenter output". + + Args: + Schema (_type_): Base schema class. + + Returns: + _type_: _description_ + """ + mime_type = fields.Str() data = fields.Str() + message_body = fields.Str() + message_title = fields.Str() @post_load def make(self, data, **kwargs): + """Make a PresenterOutput object from JSON data. + + Args: + data (_type_): JSON data. + + Returns: + _type_: PresenterOutput object. + """ return PresenterOutput(**data) class PresenterOutput: - def __init__(self, mime_type, data): + """Class for "presenter output".""" + + def __init__(self, mime_type, data, message_body, message_title): + """Initialize the "presenter output". + + Args: + mime_type (_type_): MIME type of the output. + data (_type_): The data itself. + message_body (_type_): Body of the message. + message_title (_type_): Title of the message. + """ self.mime_type = mime_type self.data = data + self.message_body = message_body + self.message_title = message_title