Skip to content

Commit

Permalink
fixed linting issues
Browse files Browse the repository at this point in the history
  • Loading branch information
infinityofspace committed Nov 13, 2024
1 parent 3e0b375 commit 93f350b
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 29 deletions.
2 changes: 2 additions & 0 deletions certbot_dns_duckdns/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
"""Certbot plugin for DNS-01 challenge using DuckDNS."""

__version__ = "v1.4"
36 changes: 22 additions & 14 deletions certbot_dns_duckdns/cert/client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
"""
The certbot Authenticator implementation for DuckDNS domains.
"""

import os

from certbot import errors
Expand All @@ -23,13 +27,17 @@ class Authenticator(dns_common.DNSAuthenticator):
"""

description = "Obtain certificates using a DNS TXT record for DuckDNS domains"
old_txt_value = ""

def __init__(self, *args, **kwargs) -> None:
super(Authenticator, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)

self._old_txt_value = ""
self._credentials = None

@classmethod
def add_parser_arguments(cls, add: callable) -> None:
def add_parser_arguments(
cls, add: callable, default_propagation_seconds=DEFAULT_PROPAGATION_SECONDS
) -> None:
"""
Add required or optional argument for the cli of certbot.
Expand Down Expand Up @@ -71,7 +79,7 @@ def _setup_credentials(self) -> None:
if credentials_file:
self._configure_file("credentials", "DuckDNS credentials INI file")
dns_common.validate_file_permissions(credentials_file)
self.credentials = self._configure_credentials(
self._credentials = self._configure_credentials(
"credentials",
"DuckDNS credentials INI file",
{
Expand Down Expand Up @@ -110,7 +118,7 @@ def _perform(self, domain: str, validation_name: str, validation: str) -> None:
raise errors.PluginError("issue resoling TXT record")

# remove the additional quotes around the TXT value
self.old_txt_value = txt_values[0].to_text()[1:-1]
self._old_txt_value = txt_values[0].to_text()[1:-1]

try:
self._get_duckdns_client().set_txt_record(duckdns_domain, validation)
Expand All @@ -132,12 +140,12 @@ def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
duckdns_domain = self._get_duckdns_domain(domain)

try:
if self.old_txt_value == "":
if self._old_txt_value == "":
# setting an empty TXT value does not work with the DuckDNS API
self._get_duckdns_client().clear_txt_record(duckdns_domain)
else:
self._get_duckdns_client().set_txt_record(
duckdns_domain, self.old_txt_value
duckdns_domain, self._old_txt_value
)
except Exception as e:
raise errors.PluginError(e)
Expand All @@ -148,7 +156,7 @@ def _get_duckdns_client(self) -> DuckDNSClient:
:return: the created DuckDNSClient object
"""
token = self.conf("token") or self.credentials.conf("token")
token = self.conf("token") or self._credentials.conf("token")
return DuckDNSClient(token)

def _get_duckdns_domain(self, domain: str) -> str:
Expand All @@ -173,9 +181,9 @@ def _get_duckdns_domain(self, domain: str) -> str:
# check if the delegated domain is a valid duckdns.org domain
if is_valid_full_duckdns_domain(delegated_domain):
return delegated_domain
else:
raise errors.PluginError(NotValidDuckdnsDomainError(delegated_domain))
except (resolver.NXDOMAIN, resolver.NoAnswer) as e:

raise errors.PluginError(NotValidDuckdnsDomainError(delegated_domain))
except (resolver.NXDOMAIN, resolver.NoAnswer):
pass

# delegated acme challenge (ipv6)
Expand All @@ -186,9 +194,9 @@ def _get_duckdns_domain(self, domain: str) -> str:
# check if the delegated domain is a valid duckdns.org domain
if is_valid_full_duckdns_domain(delegated_domain):
return delegated_domain
else:
raise errors.PluginError(NotValidDuckdnsDomainError(delegated_domain))
except (resolver.NXDOMAIN, resolver.NoAnswer) as e:

raise errors.PluginError(NotValidDuckdnsDomainError(delegated_domain))
except (resolver.NXDOMAIN, resolver.NoAnswer):
pass

# invalid domain
Expand Down
71 changes: 56 additions & 15 deletions certbot_dns_duckdns/duckdns/client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
"""
This module provides a client for clearing, setting and receiving the TXT record for DuckDNS domains.
"""

import logging
import re

Expand All @@ -16,10 +20,26 @@


def is_valid_duckdns_domain(domain):
"""
Check if the domain is a valid duckdns subdomain.
:param domain: the domain to check
:return: True if the domain is a valid duckdns subdomain, otherwise False
"""

return VALID_DUCKDNS_DOMAIN_REGEX.match(domain) is not None


def is_valid_full_duckdns_domain(domain):
"""
Check if the domain is a valid duckdns domain with the '.duckdns.org' suffix.
:param domain: the domain to check
:return: True if the domain is a valid duckdns domain, otherwise False
"""

return VALID_FULL_DUCKDNS_DOMAIN_REGEX.match(domain) is not None


Expand All @@ -28,7 +48,34 @@ class TXTUpdateError(Exception):
Exception if during the TXT record changing something goes wrong.
"""

pass
template_txt_set = (
'The TXT update "{txt}" for domain "{domain}" could not be set.\n'
"Request status code: {status_code}\n"
"Request response text: {response}"
)

template_txt_delete = (
'The TXT value for domain "{domain}" could not be deleted.\n'
"Request status code: {status_code}\n"
"Request response text: {response}"
)

def __init__(self, domain, status_code, response, txt=None):
self.txt = txt
self.domain = domain
self.status_code = status_code
self.response = response

if txt:
self.message = self.template_txt_set.format(
domain=domain, status_code=status_code, response=response, txt=txt
)
else:
self.message = self.template_txt_delete.format(
domain=domain, status_code=status_code, response=response
)

super().__init__(self.message)


class NotValidDuckdnsDomainError(Exception):
Expand Down Expand Up @@ -69,13 +116,14 @@ def __init__(self, token: str) -> None:

self._token = token

def set_txt_record(self, domain: str, txt: str) -> None:
def set_txt_record(self, domain: str, txt: str, timeout: int = 600) -> None:
"""
Set a TXT record value for a specific DuckDNS domain.
:param domain: the full domain or only the subdomain of duckdns
(e.g. example of the full domain example.duckdns.org) for which the value of the TXT entry should set
:param txt: the string value to set as TXT record
:param timeout: the timeout for the request in seconds
:raise TXTUpdateError: if the TXT record can not be set
:raise NotValidDuckdnsDomainError: if the domain is not a valid duckdns domain
Expand All @@ -87,14 +135,10 @@ def set_txt_record(self, domain: str, txt: str) -> None:
root_domain = self.__get_validated_root_domain__(domain)

params = {"token": self._token, "domains": root_domain, "txt": txt}
r = requests.get(url=BASE_URL, params=params)
r = requests.get(url=BASE_URL, params=params, timeout=timeout)

if r.text != "OK":
raise TXTUpdateError(
'The TXT update "{}" for domain "{}" could not be set.\n'
"Request status code: {}\n"
"Request response text: {}".format(txt, domain, r.status_code, r.text)
)
raise TXTUpdateError(txt, domain, r.status_code, r.text)

@staticmethod
def __get_validated_root_domain__(domain):
Expand All @@ -114,12 +158,13 @@ def __get_validated_root_domain__(domain):

return root_domain

def clear_txt_record(self, domain: str) -> None:
def clear_txt_record(self, domain: str, timeout: int = 600) -> None:
"""
Clear the TXT record for a specific DuckDNS domain.
:param domain: the full domain or only the subdomain of duckdns
(e.g. example of the full domain example.duckdns.org) for which the TXT entry should be cleared
:param timeout: the timeout for the request in seconds
:raise TXTUpdateError: if the TXT record can not be cleared
:raise NotValidDuckdnsDomainError: if the domain is not a valid duckdns domain
Expand All @@ -136,11 +181,7 @@ def clear_txt_record(self, domain: str) -> None:
"txt": "",
"clear": "true",
}
r = requests.get(url=BASE_URL, params=params)
r = requests.get(url=BASE_URL, params=params, timeout=timeout)

if r.text != "OK":
raise TXTUpdateError(
'The clearing of the TXT record for domain "{}" was not successful.\n'
"Request status code: {}\n"
"Request response text: {}".format(domain, r.status_code, r.text)
)
raise TXTUpdateError(domain, r.status_code, r.text, None)

0 comments on commit 93f350b

Please sign in to comment.