diff --git a/sickchill/gui/slick/views/displayShow.mako b/sickchill/gui/slick/views/displayShow.mako
index 81703ad42c..dc8fc3562d 100644
--- a/sickchill/gui/slick/views/displayShow.mako
+++ b/sickchill/gui/slick/views/displayShow.mako
@@ -1,7 +1,7 @@
<%inherit file="/layouts/main.mako" />
<%!
import datetime
- from urllib.parse import quote
+ from urllib.parse import quote, urljoin
from sickchill import settings
from sickchill.oldbeard import subtitles, notifiers, scdatetime, network_timezones, helpers
@@ -548,10 +548,10 @@
% if settings.DOWNLOAD_URL and epResult['location']:
<%
filename = epResult['location']
- for rootDir in settings.ROOT_DIRS.split('|'):
- if rootDir.startswith('/'):
- filename = filename.replace(rootDir, "")
- filename = settings.DOWNLOAD_URL + quote(filename)
+ for rootDir in settings.ROOT_DIRS.split('|')[1:]:
+ if filename.startswith(rootDir):
+ filename = filename.replace(rootDir, "").lstrip("/\\")
+ filename = urljoin(settings.DOWNLOAD_URL, quote(filename))
%>
${_('Download')}
% endif
diff --git a/sickchill/oldbeard/config.py b/sickchill/oldbeard/config.py
index b697470bb5..db6340585b 100644
--- a/sickchill/oldbeard/config.py
+++ b/sickchill/oldbeard/config.py
@@ -2,6 +2,7 @@
import os.path
import platform
import re
+from pathlib import Path
from urllib import parse
import rarfile
@@ -300,7 +301,9 @@ def change_unpack_dir(unpack_dir):
return True
if os.path.normpath(settings.UNPACK_DIR) != os.path.normpath(unpack_dir):
- if bool(settings.ROOT_DIRS) and any(helpers.is_subdirectory(unpack_dir, rd) for rd in settings.ROOT_DIRS.split("|")[1:]):
+ if settings.ROOT_DIRS and any(
+ Path(root_directory).resolve() in Path(unpack_dir).resolve().parents for root_directory in settings.ROOT_DIRS.split("|")[1:]
+ ):
# don't change if it's in any of the TV root directories
logger.info("Unable to change unpack directory to a sub-directory of a TV root dir")
return False
diff --git a/sickchill/oldbeard/helpers.py b/sickchill/oldbeard/helpers.py
index 26f5d1c126..5e02907751 100644
--- a/sickchill/oldbeard/helpers.py
+++ b/sickchill/oldbeard/helpers.py
@@ -950,31 +950,6 @@ def has_hidden_attribute(filepath):
return False
-def real_path(path):
- """
- Returns:
- the canonicalized absolute pathname. The resulting path will have no symbolic link, '/./' or '/../' components.
- """
- return os.path.normpath(os.path.normcase(os.path.realpath(path)))
-
-
-def is_subdirectory(subdir_path, topdir_path):
- """
- Returns true if a subdir_path is a subdirectory of topdir_path
- else otherwise.
-
- Parameters:
- subdir_path: The full path to the subdirectory
- topdir_path: The full path to the top directory to check subdir_path against
- """
- topdir_path = real_path(topdir_path)
- subdir_path = real_path(subdir_path)
-
- # checks if the common prefix of both is equal to directory
- # e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b
- return os.path.commonprefix([subdir_path, topdir_path]) == topdir_path
-
-
def set_up_anidb_connection():
"""Connect to anidb"""
diff --git a/sickchill/oldbeard/processTV.py b/sickchill/oldbeard/processTV.py
index 3d2a361b59..0735da2eb1 100644
--- a/sickchill/oldbeard/processTV.py
+++ b/sickchill/oldbeard/processTV.py
@@ -36,30 +36,31 @@ def delete_folder(folder, check_empty=True):
return: True on success, False on failure
"""
+ folder = Path(folder).resolve()
# check if it's a folder
- if not os.path.isdir(folder):
+ if not folder.is_dir():
return False
# check if it isn't TV_DOWNLOAD_DIR
- if settings.TV_DOWNLOAD_DIR and helpers.real_path(folder) == helpers.real_path(settings.TV_DOWNLOAD_DIR):
+ if settings.TV_DOWNLOAD_DIR and str(Path(folder).resolve()) == str(Path(settings.TV_DOWNLOAD_DIR).resolve()):
return False
# check if it's empty folder when wanted to be checked
if check_empty:
- check_files = os.listdir(folder)
- if check_files:
- logger.info(f"Not deleting folder {folder} found the following files: {check_files}")
+ found_files = [file for file in folder.iterdir()]
+ if found_files:
+ logger.info(f"Not deleting folder {folder} found the following files: {found_files}")
return False
try:
logger.info(f"Deleting folder (if it's empty): {folder}")
- os.rmdir(folder)
+ folder.rmdir()
except (OSError, IOError) as error:
logger.warning(f"Warning: unable to delete folder: {folder}: {error}")
return False
else:
try:
- logger.info("Deleting folder: " + folder)
+ logger.info(f"Deleting folder: {folder}")
shutil.rmtree(folder)
except (OSError, IOError) as error:
logger.warning(f"Warning: unable to delete folder: {folder}: {error}")
@@ -82,27 +83,28 @@ def delete_files(process_path, unwanted_files, result, force=False):
elif not result.result:
return
+ process_path = Path(process_path)
# Delete all file not needed
- for cur_file in unwanted_files:
- cur_file_path = os.path.join(process_path, cur_file)
- if not os.path.isfile(cur_file_path):
+ for current_file in unwanted_files:
+ file_path = process_path / current_file
+ if not file_path.is_file():
continue # Prevent error when a notwantedfiles is an associated files
- result.output += log_helper(f"Deleting file: {cur_file}", logger.DEBUG)
+ result.output += log_helper(f"Deleting file: {current_file}", logger.DEBUG)
# check first the read-only attribute
- file_attribute = os.stat(cur_file_path)[0]
+ file_attribute = file_path.stat()[0]
if not file_attribute & stat.S_IWRITE:
# File is read-only, so make it writeable
- result.output += log_helper(f"Changing ReadOnly Flag for file: {cur_file}", logger.DEBUG)
+ result.output += log_helper(f"Changing ReadOnly Flag for file: {current_file}", logger.DEBUG)
try:
- os.chmod(cur_file_path, stat.S_IWRITE)
+ file_path.chmod(stat.S_IWRITE)
except OSError as error:
- result.output += log_helper(f"Cannot change permissions of {cur_file_path}: {error}", logger.DEBUG)
+ result.output += log_helper(f"Cannot change permissions of {current_file}: {error}", logger.DEBUG)
try:
- os.remove(cur_file_path)
+ file_path.unlink(True)
except OSError as error:
- result.output += log_helper(f"Unable to delete file {cur_file}: {error}", logger.DEBUG)
+ result.output += log_helper(f"Unable to delete file {current_file}: {error}", logger.DEBUG)
def log_helper(message, level=logging.INFO):
@@ -132,13 +134,17 @@ def process_dir(process_path, release_name=None, process_method=None, force=Fals
# if the client and SickChill are not on the same machine translate the directory into a network directory
elif all(
- [settings.TV_DOWNLOAD_DIR, os.path.isdir(settings.TV_DOWNLOAD_DIR), os.path.normpath(process_path) == os.path.normpath(settings.TV_DOWNLOAD_DIR)]
+ [
+ settings.TV_DOWNLOAD_DIR,
+ Path(settings.TV_DOWNLOAD_DIR).is_dir(),
+ str(Path(process_path).resolve()) == str(Path(settings.TV_DOWNLOAD_DIR).resolve()),
+ ]
):
process_path = os.path.join(settings.TV_DOWNLOAD_DIR, os.path.abspath(process_path).split(os.path.sep)[-1])
result.output += log_helper(f"Trying to use folder: {process_path} ", logger.DEBUG)
# if we didn't find a real dir then quit
- if not os.path.isdir(process_path):
+ if not Path(process_path).is_dir():
result.output += log_helper(
"Unable to figure out what folder to process. "
"If your downloader and SickChill aren't on the same PC "
@@ -162,6 +168,8 @@ def process_dir(process_path, release_name=None, process_method=None, force=Fals
result.output += log_helper(_("Processing {process_path}").format(process_path=process_path))
generator_to_use = os.walk(process_path, followlinks=settings.PROCESSOR_FOLLOW_SYMLINKS)
+ rar_files = []
+
for current_directory, directory_names, filenames in generator_to_use:
result.result = True
@@ -273,7 +281,11 @@ def validate_dir(process_path, release_name, failed, result):
result.missed_files.append(f"{process_path} : Failed download")
return False
- if settings.TV_DOWNLOAD_DIR and helpers.real_path(process_path) != helpers.real_path(settings.TV_DOWNLOAD_DIR) and helpers.is_hidden_folder(process_path):
+ if (
+ settings.TV_DOWNLOAD_DIR
+ and str(Path(process_path).resolve()) != str(Path(settings.TV_DOWNLOAD_DIR).resolve())
+ and helpers.is_hidden_folder(process_path)
+ ):
result.output += log_helper(f"Ignoring hidden folder: {process_path}", logger.DEBUG)
if not process_path.endswith("@eaDir"):
result.missed_files.append(f"{process_path} : Hidden folder")
diff --git a/sickchill/oldbeard/traktChecker.py b/sickchill/oldbeard/traktChecker.py
index a3b162f9ff..378a199524 100644
--- a/sickchill/oldbeard/traktChecker.py
+++ b/sickchill/oldbeard/traktChecker.py
@@ -420,7 +420,7 @@ def _add_show_with_defaults(indexer, indexer_id, name, status):
root_dirs = settings.ROOT_DIRS.split("|")
try:
- location = root_dirs[int(root_dirs[0]) + 1]
+ location = root_dirs[int(root_dirs[0])]
except Exception:
location = None
diff --git a/sickchill/show/indexers/__init__.py b/sickchill/show/indexers/__init__.py
index 4db92dae0f..7c5097467c 100644
--- a/sickchill/show/indexers/__init__.py
+++ b/sickchill/show/indexers/__init__.py
@@ -1,3 +1,5 @@
+from typing import Union
+
from .handler import ShowIndexer
-indexer = None
+indexer: Union[ShowIndexer, None] = None
diff --git a/sickchill/start.py b/sickchill/start.py
index 689690d4d0..93accdf352 100644
--- a/sickchill/start.py
+++ b/sickchill/start.py
@@ -1236,7 +1236,7 @@ def save_config():
"metadata_mede8er": settings.METADATA_MEDE8ER,
"backlog_days": int(settings.BACKLOG_DAYS),
"backlog_missing_only": int(settings.BACKLOG_MISSING_ONLY),
- "root_dirs": settings.ROOT_DIRS if settings.ROOT_DIRS else "",
+ "root_dirs": settings.ROOT_DIRS or "",
"tv_download_dir": settings.TV_DOWNLOAD_DIR,
"keep_processed_dir": int(settings.KEEP_PROCESSED_DIR),
"process_method": settings.PROCESS_METHOD,
diff --git a/sickchill/tv.py b/sickchill/tv.py
index 999723779d..77f2727d07 100644
--- a/sickchill/tv.py
+++ b/sickchill/tv.py
@@ -880,7 +880,11 @@ def load_imdb_info(self):
if not self.imdb_id:
# TODO: Load tvmaze/tvdb info into other imdb_info fields
- self.imdb_id = helpers.imdb_from_tvdbid_on_tvmaze(self.indexerid)
+ # noinspection PyBroadException
+ try:
+ self.imdb_id = helpers.imdb_from_tvdbid_on_tvmaze(self.indexerid)
+ except Exception:
+ self.imdb_id = None
try:
client = Cinemagoer()
@@ -941,6 +945,8 @@ def load_imdb_info(self):
logger.debug(f"{self.indexerid}: Loading show info from IMDb")
imdb_title: dict = client.get_movie(self.imdb_id.strip("t"))
+ if not imdb_title:
+ return
self.imdb_info = {
"indexer_id": self.indexerid,
@@ -959,6 +965,8 @@ def load_imdb_info(self):
}
logger.debug(f"{self.indexerid}: Obtained info from IMDb ->{self.imdb_info}")
+ except KeyError:
+ logger.info(f"Could not get IMDB info for {self.name}")
except (
TypeError,
ValueError,
diff --git a/sickchill/views/api/webapi.py b/sickchill/views/api/webapi.py
index bfc23e9a22..0889763331 100644
--- a/sickchill/views/api/webapi.py
+++ b/sickchill/views/api/webapi.py
@@ -589,38 +589,24 @@ def _map_quality(show_quality):
def _get_root_dirs():
- if settings.ROOT_DIRS == "":
+ if not settings.ROOT_DIRS:
return {}
- root_dir = {}
root_dirs = settings.ROOT_DIRS.split("|")
- default_index = int(settings.ROOT_DIRS.split("|")[0])
- root_dir["default_index"] = int(settings.ROOT_DIRS.split("|")[0])
- # remove default_index value from list (this fixes the offset)
- root_dirs.pop(0)
-
- if len(root_dirs) < default_index:
+ try:
+ default_index = int(root_dirs[0])
+ except ValueError:
return {}
- # clean up the list - replace %xx escapes by their single-character equivalent
- root_dirs = [urllib.parse.unquote_plus(x) for x in root_dirs]
+ if default_index > len(root_dirs) - 1:
+ return {}
default_dir = root_dirs[default_index]
dir_list = []
- for root_dir in root_dirs:
- valid = 1
- # noinspection PyBroadException
- try:
- os.listdir(root_dir)
- except Exception:
- valid = 0
- default = 0
- if root_dir is default_dir:
- default = 1
-
- cur_dir = {"valid": valid, "location": root_dir, "default": default}
+ for root_dir in root_dirs[1:]:
+ cur_dir = {"valid": int(Path(root_dir).resolve().is_dir()), "location": root_dir, "default": int(root_dir is default_dir)}
dir_list.append(cur_dir)
return dir_list
@@ -1428,42 +1414,24 @@ def run(self):
"""Add a new root (parent) directory to SickChill"""
self.location = urllib.parse.unquote_plus(self.location)
- location_matched = 0
- index = 0
# disallow adding/setting an invalid dir
- if not os.path.isdir(self.location):
+ if not Path(self.location).is_dir():
return _responds(RESULT_FAILURE, msg="Location is invalid")
- root_dirs = []
-
- if settings.ROOT_DIRS == "":
+ if not settings.ROOT_DIRS:
self.default = 1
+ root_dirs = []
else:
root_dirs = settings.ROOT_DIRS.split("|")
- index = int(settings.ROOT_DIRS.split("|")[0])
- root_dirs.pop(0)
- # clean up the list - replace %xx escapes by their single-character equivalent
- root_dirs = [urllib.parse.unquote_plus(x) for x in root_dirs]
- for x in root_dirs:
- if x == self.location:
- location_matched = 1
- if self.default == 1:
- index = root_dirs.index(self.location)
- break
-
- if location_matched == 0:
- if self.default == 1:
- root_dirs.insert(0, self.location)
- else:
- root_dirs.append(self.location)
- root_dirs_new = [urllib.parse.unquote_plus(x) for x in root_dirs]
- root_dirs_new.insert(0, str(index))
- # noinspection PyCompatibility
- root_dirs_new = "|".join(str(x) for x in root_dirs_new)
+ if not self.location.lower() in [root_dir.lower() for root_dir in root_dirs[1:]]:
+ root_dirs.append(self.location)
- settings.ROOT_DIRS = root_dirs_new
+ if self.default:
+ root_dirs[0] = [root_dir.lower() for root_dir in root_dirs[1:]].index(self.location.lower())
+
+ settings.ROOT_DIRS = "|".join(f"{root_dir}" for root_dir in root_dirs)
return _responds(RESULT_SUCCESS, _get_root_dirs(), msg="Root directories updated")
@@ -1477,10 +1445,10 @@ def run(self):
data = {
"current_version": {
- "version": update_manager.get_current_version(),
+ "version": str(update_manager.get_current_version()),
},
"latest_version": {
- "version": update_manager.get_newest_version(),
+ "version": str(update_manager.get_newest_version()),
},
"version_delta": update_manager.get_version_delta(),
"needs_update": needs_update,
@@ -1557,36 +1525,25 @@ def __init__(self, args, kwargs):
def run(self):
"""Delete a root (parent) directory from SickChill"""
- if settings.ROOT_DIRS == "":
+ if not settings.ROOT_DIRS:
return _responds(RESULT_FAILURE, _get_root_dirs(), msg="No root directories detected")
- new_index = 0
- root_dirs_new = []
root_dirs = settings.ROOT_DIRS.split("|")
- index = int(root_dirs[0])
- root_dirs.pop(0)
- # clean up the list - replace %xx escapes by their single-character equivalent
- root_dirs = [urllib.parse.unquote_plus(x) for x in root_dirs]
- old_root_dir = root_dirs[index]
- for curRootDir in root_dirs:
- if not curRootDir == self.location:
- root_dirs_new.append(curRootDir)
- else:
- new_index = 0
- for curIndex, curNewRootDir in enumerate(root_dirs_new):
- if curNewRootDir is old_root_dir:
- new_index = curIndex
- break
+ self.location = urllib.parse.unquote_plus(self.location)
+
+ if self.location.lower() not in [root_dir.lower() for root_dir in root_dirs[1:]]:
+ return _responds(RESULT_FAILURE, _get_root_dirs(), msg="Root directory was not found, not changed")
- root_dirs_new = [urllib.parse.unquote_plus(x) for x in root_dirs_new]
- if root_dirs_new:
- root_dirs_new.insert(0, str(new_index))
- # noinspection PyCompatibility
- root_dirs_new = "|".join(str(x) for x in root_dirs_new)
+ index = [root_dir.lower() for root_dir in root_dirs[1:]].index(self.location.lower())
+
+ root_dirs.pop(index)
+
+ if str(index) == root_dirs[0]:
+ root_dirs[0] = 0
+
+ settings.ROOT_DIRS = "|".join(f"{root_dir}" for root_dir in root_dirs)
- settings.ROOT_DIRS = root_dirs_new
- # what if the root dir was not found?
return _responds(RESULT_SUCCESS, _get_root_dirs(), msg="Root directory deleted")
@@ -2080,15 +2037,17 @@ def run(self):
return _responds(RESULT_FAILURE, msg="An existing indexerid already exists in database")
if not self.location:
- if settings.ROOT_DIRS != "":
+ if settings.ROOT_DIRS:
root_dirs = settings.ROOT_DIRS.split("|")
- root_dirs.pop(0)
- default_index = int(settings.ROOT_DIRS.split("|")[0])
+ default_index = int(root_dirs[0])
+ if default_index > len(root_dirs) - 1:
+ return _responds(RESULT_FAILURE, msg="Default root directory is not set, please provide a location")
+
self.location = root_dirs[default_index]
else:
return _responds(RESULT_FAILURE, msg="Root directory is not set, please provide a location")
- if not os.path.isdir(self.location):
+ if not Path(self.location).is_dir():
return _responds(RESULT_FAILURE, msg="'" + self.location + "' is not a valid location")
# use default quality as a fail-safe
diff --git a/sickchill/views/home.py b/sickchill/views/home.py
index 15d8628649..e70e515c0f 100644
--- a/sickchill/views/home.py
+++ b/sickchill/views/home.py
@@ -72,16 +72,13 @@ def _getEpisode(show, season=None, episode=None, absolute=None):
def index(self):
t = PageTemplate(rh=self, filename="home.mako")
- selected_root = self.get_body_argument("root", None)
+ selected_root = self.get_body_argument("root", "")
if selected_root and settings.ROOT_DIRS:
- backend_pieces = settings.ROOT_DIRS.split("|")
- backend_dirs = backend_pieces[1:]
+ backend_dirs = settings.ROOT_DIRS.split("|")[1:]
try:
assert selected_root != "-1"
selected_root_dir = backend_dirs[int(selected_root)]
- if selected_root_dir[-1] not in ("/", "\\"):
- selected_root_dir += os.sep
except (IndexError, ValueError, TypeError, AssertionError):
selected_root_dir = ""
else:
@@ -90,7 +87,7 @@ def index(self):
shows = []
anime = []
for show in settings.show_list:
- if selected_root_dir in show.get_location:
+ if show.get_location.startswith(selected_root_dir):
if settings.ANIME_SPLIT_HOME and show.is_anime:
anime.append(show)
else:
@@ -112,7 +109,7 @@ def index(self):
def filter(self):
t = PageTemplate(rh=self, filename="home.mako")
- selected_root = self.get_body_argument("root", None)
+ selected_root = self.get_body_argument("root", "")
page = try_int(self.get_argument("p", default="0"))
limit = try_int(self.get_argument("limit", default=None))
kind = self.get_argument("type", "all")
@@ -121,13 +118,10 @@ def filter(self):
kind = "all"
if selected_root and settings.ROOT_DIRS:
- backend_pieces = settings.ROOT_DIRS.split("|")
- backend_dirs = backend_pieces[1:]
+ backend_dirs = settings.ROOT_DIRS.split("|")[1:]
try:
assert selected_root != "-1"
selected_root_dir = backend_dirs[int(selected_root)]
- if selected_root_dir[-1] not in ("/", "\\"):
- selected_root_dir += os.sep
except (IndexError, ValueError, TypeError, AssertionError):
selected_root_dir = ""
else:
@@ -136,7 +130,7 @@ def filter(self):
shows_to_show = []
skipped = 0
for show in settings.show_list:
- if selected_root_dir and selected_root_dir not in show.get_location:
+ if not show.get_location.startswith(selected_root_dir):
continue
if kind == "anime" and not show.is_anime:
@@ -744,8 +738,7 @@ def status(self):
root_dir = {}
if settings.ROOT_DIRS:
- backend_pieces = settings.ROOT_DIRS.split("|")
- backend_dirs = backend_pieces[1:]
+ backend_dirs = settings.ROOT_DIRS.split("|")[1:]
else:
backend_dirs = []
@@ -1496,7 +1489,7 @@ def updateEMBY(self, show=None):
def setStatus(self, direct=False):
if direct is True:
# noinspection PyUnresolvedReferences
- show = self.to_change_showa
+ show = self.to_change_show
# noinspection PyUnresolvedReferences
eps = self.to_change_eps
status = self.get_body_argument("newStatus")
diff --git a/sickchill/views/index.py b/sickchill/views/index.py
index b17cfdc03f..9f150cac1c 100644
--- a/sickchill/views/index.py
+++ b/sickchill/views/index.py
@@ -7,6 +7,7 @@
from concurrent.futures import ThreadPoolExecutor
from mimetypes import guess_type
from secrets import compare_digest
+from typing import Any
from urllib.parse import urljoin
from mako.exceptions import RichTraceback
@@ -25,11 +26,10 @@
try:
import jwt
- from jwt.algorithms import RSAAlgorithm as jwt_algorithms_RSAAlgorithm
-
- has_cryptography = True
-except Exception:
- has_cryptography = False
+ from jwt.algorithms import RSAAlgorithm
+except (ImportError, Exception):
+ jwt = None
+ RSAAlgorithm = None
class BaseHandler(RequestHandler):
@@ -109,22 +109,21 @@ def get_current_user(self):
return True
if settings.WEB_USERNAME and settings.WEB_PASSWORD:
+ # Logged into UI?
+ if self.get_signed_cookie("sickchill_user"):
+ return True
+
# Authenticate using jwt for CF Access
# NOTE: Setting a username and password is STILL required to protect poorly configured tunnels or firewalls
- if settings.CF_AUTH_DOMAIN and settings.CF_POLICY_AUD and has_cryptography:
- CERTS_URL = "{}/cdn-cgi/access/certs".format(settings.CF_AUTH_DOMAIN)
- if "CF_Authorization" in self.request.cookies:
- jwk_set = helpers.getURL(CERTS_URL, returns="json")
- if jwk_set:
- for key_dict in jwk_set["keys"]:
- public_key = jwt_algorithms_RSAAlgorithm.from_jwk(json.dumps(key_dict))
- if jwt.decode(self.request.cookies["CF_Authorization"], key=public_key, audience=settings.CF_POLICY_AUD):
+ if settings.CF_AUTH_DOMAIN and settings.CF_POLICY_AUD and jwt:
+ token = self.request.cookies.get("CF_Authorization") or self.request.headers.get("Cf-Access-Jwt-Assertion")
+ if token:
+ key_set = helpers.getURL(f"{settings.CF_AUTH_DOMAIN}/cdn-cgi/access/certs", returns="json")
+ if key_set:
+ for key in key_set["keys"]:
+ if jwt.decode(token, key=RSAAlgorithm.from_jwk(key), audience=settings.CF_POLICY_AUD):
return True
- # Logged into UI?
- if self.get_secure_cookie("sickchill_user"):
- return True
-
# Basic Auth at a minimum
auth_header = self.request.headers.get("Authorization")
if auth_header and auth_header.startswith("Basic "):
@@ -133,7 +132,6 @@ def get_current_user(self):
if compare_digest(username, settings.WEB_USERNAME) and compare_digest(password, settings.WEB_PASSWORD):
return True
return False
-
else:
# Local network
# strip / (%value/if_name) from remote_ip IPv6 scoped literal IP Addresses (RFC 4007) until phihag/ipaddress is updated tracking cpython 3.9.
@@ -149,11 +147,8 @@ def initialize(self):
self.executor = ThreadPoolExecutor(thread_name_prefix="WEBSERVER-" + self.__class__.__name__.upper())
@authenticated
- async def get(self, route, *args, **kwargs):
+ async def get(self, route, *_args, **_kwargs):
try:
- # logger.debug(f"Call for {route} with {args} and {kwargs}")
- # logger.debug(f"Call for {route} with args [{self.request.arguments}]")
-
# route -> method obj
route = route.strip("/").replace(".", "_").replace("-", "_") or "index"
# logger.debug(f"Route: {route}")
@@ -163,7 +158,7 @@ async def get(self, route, *args, **kwargs):
message = ("404", "Could not find the page you requested")
ui.notifications.error(*message)
logger.info(", ".join(message))
- helpers.add_site_message(", ".join(message), tag=message[0], level="danger")
+ helpers.add_site_message(", ".join(message), tag=message[0])
return self.redirect("/home/")
from inspect import signature
@@ -174,17 +169,15 @@ async def get(self, route, *args, **kwargs):
logger.debug(f"{route} has signature {sig} and needs updated to use get_*_argument to properly decode and sanitize argument values")
results = await self.async_call(method, len(sig.parameters))
-
try:
await self.finish(results)
except Exception as e:
- if settings.DEVELOPER:
+ if 0:
logger.debug(f"self.finish exception {e}, result {results}")
else:
logger.debug(f"self.finish exception {e}")
-
except AttributeError:
- logger.debug('Failed doing webui request "{0}": {1}'.format(route, traceback.format_exc()))
+ logger.debug(f"Failed doing webui request '{route}'", exc_info=True)
raise HTTPError(404)
@run_on_executor
@@ -198,24 +191,24 @@ def async_call(self, function, needs_params):
return function()
if self.request.method == "POST":
- get_argument = self.get_body_argument
- get_arguments = self.get_body_arguments
+ method_argument = self.get_body_argument
+ method_arguments = self.get_body_arguments
elif self.request.method == "GET":
- get_argument = self.get_query_argument
- get_arguments = self.get_query_arguments
+ method_argument = self.get_query_argument
+ method_arguments = self.get_query_arguments
else:
- get_argument = self.get_argument
- get_arguments = self.get_arguments
+ method_argument = self.get_argument
+ method_arguments = self.get_arguments
kwargs = {}
for arg, value in self.request.arguments.items():
if isinstance(value, str):
- kwargs[arg] = get_argument(arg)
+ kwargs[arg] = method_argument(arg)
elif isinstance(value, list):
if len(value) == 1:
- kwargs[arg] = get_argument(arg)
+ kwargs[arg] = method_argument(arg)
else:
- kwargs[arg] = get_arguments(arg)
+ kwargs[arg] = method_arguments(arg)
else:
raise Exception
return function(**kwargs)
@@ -231,8 +224,7 @@ def async_call(self, function, needs_params):
@Route("(.*)(/?)", name="index")
class WebRoot(WebHandler):
def print_traceback(self, error, *args, **kwargs):
- logger.info(f"A mako error occurred: {error}")
- logger.debug(traceback.format_exc())
+ logger.info(f"A mako error occurred: {error}", stack_info=True, exc_info=True)
logger.debug(f"args: {args}, kwargs: {kwargs}")
t = PageTemplate(rh=self, filename="500.mako")
kwargs["backtrace"] = RichTraceback(error=error)
@@ -251,7 +243,7 @@ def apibuilder(self):
episodes = {}
- results = main_db_con.select("SELECT episode, season, showid " "FROM tv_episodes " "ORDER BY season ASC, episode ASC")
+ results = main_db_con.select("SELECT episode, season, showid " "FROM tv_episodes " "ORDER BY season, episode")
for result in results:
if result["showid"] not in episodes:
@@ -428,7 +420,7 @@ def set_site_message(self):
else:
if self.get_current_user() and not (check_installed() or settings.DEVELOPER):
message = _("SickChill no longer is supported unless installed with pip or poetry. Source and git installs are for experienced users only")
- helpers.add_site_message(message, tag="not_installed", level="danger")
+ helpers.add_site_message(message, tag="not_installed")
return settings.SITE_MESSAGES
@@ -452,6 +444,6 @@ def sickchill_background(self):
def custom_css(self):
if settings.CUSTOM_CSS_PATH and os.path.isfile(settings.CUSTOM_CSS_PATH):
self.set_header("Content-Type", "text/css")
- with open(settings.CUSTOM_CSS_PATH, "r") as content:
+ with open(settings.CUSTOM_CSS_PATH) as content:
return content.read()
return None
diff --git a/sickchill/views/manage/add_shows.py b/sickchill/views/manage/add_shows.py
index 0e62cad875..e0a624ffab 100644
--- a/sickchill/views/manage/add_shows.py
+++ b/sickchill/views/manage/add_shows.py
@@ -65,7 +65,7 @@ def searchIndexersForShowName(self):
# Query Indexers for each search term and build the list of results
for index, indexer_object in sickchill.indexer:
- if int(indexer) and int(indexer) != index:
+ if indexer and indexer != index:
continue
logger.debug(
@@ -89,9 +89,9 @@ def searchIndexersForShowName(self):
final_results.extend(
{
(
- sickchill.indexer.name(index),
+ sickchill.indexer[index].name,
index,
- indexer_object.show_url,
+ sickchill.indexer[index].show_url,
show["id"],
show["seriesName"],
show["firstAired"],
@@ -108,7 +108,7 @@ def searchIndexersForShowName(self):
final_results.sort(key=lambda x: x[4].lower().index(search_term.lower()))
final_results.sort(key=lambda x: x[4].lower() == search_term.lower(), reverse=True)
- lang_id = sickchill.indexer.lang_dict()[lang]
+ lang_id = sickchill.indexer[indexer].lang_dict()[lang]
return json.dumps({"results": final_results, "langid": lang_id, "success": len(final_results) > 0})
def massAddTable(self):
@@ -117,17 +117,6 @@ def massAddTable(self):
if not root_dirs:
return _("No folders selected.")
- if settings.ROOT_DIRS:
- default_index = int(settings.ROOT_DIRS.split("|")[0])
- else:
- default_index = 0
-
- if len(root_dirs) > default_index:
- tmp = root_dirs[default_index]
- if tmp in root_dirs:
- root_dirs.remove(tmp)
- root_dirs.insert(0, tmp)
-
dir_list = []
main_db_con = db.DBConnection()
@@ -145,6 +134,7 @@ def massAddTable(self):
if not os.path.isdir(cur_path):
continue
# ignore Synology folders
+ # noinspection SpellCheckingInspection
if cur_file.lower() in ["#recycle", "@eadir"]:
continue
except Exception:
@@ -156,13 +146,9 @@ def massAddTable(self):
"display_dir": "" + os.path.dirname(cur_path) + os.sep + "" + os.path.basename(cur_path),
}
- # see if the folder is in KODI already
dir_results = main_db_con.select("SELECT indexer_id FROM tv_shows WHERE location = ? LIMIT 1", [cur_path])
- if dir_results:
- cur_dir["added_already"] = True
- else:
- cur_dir["added_already"] = False
+ cur_dir["added_already"] = bool(dir_results)
dir_list.append(cur_dir)
@@ -241,6 +227,7 @@ def trendingShows(self):
trakt_list = self.get_query_argument("traktList", default="anticipated").lower()
+ # noinspection SpellCheckingInspection
trakt_options = {
"anticipated": _("Most Anticipated Shows"),
"newshow": _("New Shows"),
@@ -272,6 +259,7 @@ def getTrendingShows(self):
trakt_list = self.get_query_argument("traktList", "").lower()
+ # noinspection SpellCheckingInspection
if trakt_list == "trending":
page_url = "shows/trending"
elif trakt_list == "popular":
@@ -303,12 +291,12 @@ def getTrendingShows(self):
return t.render(black_list=black_list, trending_shows=trending_shows)
def getTrendingShowImage(self):
- indexerId = self.get_body_argument("indexerId")
- image_url = sickchill.indexer.series_poster_url_by_id(indexerId)
+ indexer_id = self.get_body_argument("indexerId")
+ image_url = sickchill.indexer.series_poster_url_by_id(indexer_id)
if image_url:
- image_path = trakt_trending.get_image_path(trakt_trending.get_image_name(indexerId))
+ image_path = trakt_trending.get_image_path(trakt_trending.get_image_name(indexer_id))
trakt_trending.cache_image(image_url, image_path)
- return indexerId
+ return indexer_id
def popularShows(self):
"""
@@ -405,12 +393,12 @@ def addShowByID(self):
show_name = self.get_query_argument("show_name")
indexer = self.get_query_argument("indexer", default="TVDB")
- def add_error(existing: TVShow = None) -> None:
+ def add_error(in_list: TVShow = None) -> None:
title = f"Unable to add {show_name}"
message = f"Could not add {show_name} with {indexer}:{indexer_id}. We were unable to locate the tvdb id at this time."
- if existing:
- message = f"{existing.name} with {existing.indexerid} is already in your show list."
+ if in_list:
+ message = f"{in_list.name} with {in_list.indexerid} is already in your show list."
logger.info(" ".join([title, message]))
ui.notifications.error(title, message)
@@ -430,8 +418,6 @@ def add_error(existing: TVShow = None) -> None:
def addNewShow(
self,
- whichSeries=None,
- indexerLang=None,
rootDir=None,
defaultStatus=None,
quality_preset=None,
@@ -440,26 +426,25 @@ def addNewShow(
season_folders=None,
subtitles=None,
subtitles_sc_metadata=None,
- fullShowPath=None,
other_shows=None,
skipShow=None,
- providedIndexer=None,
anime=None,
scene=None,
- blacklist=[],
- whitelist=[],
+ blacklist=None,
+ whitelist=None,
defaultStatusAfter=None,
+ **_kwargs,
):
"""
Receive tvdb id, dir, and other options and create a show from them. If extra show dirs are
provided then it forwards back to newShow, if not it goes to /home.
"""
- indexerLang = self.get_body_argument("indexerLang", default=settings.INDEXER_DEFAULT_LANGUAGE)
+ indexer_language = self.get_body_argument("indexerLang", default=settings.INDEXER_DEFAULT_LANGUAGE)
# grab our list of other dirs if given
other_shows = self.get_arguments("other_shows")
- fullShowPath = self.get_argument("fullShowPath", default=None)
+ full_show_path = self.get_argument("fullShowPath", default=None)
def finishAddShow():
# if there are no extra shows then go home
@@ -477,17 +462,17 @@ def finishAddShow():
if skipShow:
return finishAddShow()
else:
- whichSeries = self.get_argument("whichSeries")
+ which_series = self.get_argument("whichSeries")
# sanity check on our inputs
- if (not rootDir and not fullShowPath) or not whichSeries:
+ if (not rootDir and not full_show_path) or not which_series:
return _("Missing params, no Indexer ID or folder: {show_to_add} and {root_dir}/{show_path}").format(
- show_to_add=whichSeries, root_dir=rootDir, show_path=fullShowPath
+ show_to_add=which_series, root_dir=rootDir, show_path=full_show_path
)
# figure out what show we're adding and where
- series_pieces = whichSeries.split("|")
- if (whichSeries and rootDir) or (whichSeries and fullShowPath and len(series_pieces) > 1):
+ series_pieces = which_series.split("|")
+ if (which_series and rootDir) or (which_series and full_show_path and len(series_pieces) > 1):
if len(series_pieces) < 6:
logger.error("Unable to add show due to show selection. Not enough arguments: {0}".format((repr(series_pieces))))
ui.notifications.error(_("Unknown error. Unable to add show due to problem with show selection."))
@@ -500,16 +485,16 @@ def finishAddShow():
else:
# if no indexer was provided use the default indexer set in General settings
indexer = int(self.get_argument("providedIndexer", default=settings.INDEXER_DEFAULT))
- indexer_id = int(whichSeries)
- show_name = os.path.basename(os.path.normpath(fullShowPath))
+ indexer_id = int(which_series)
+ show_name = os.path.basename(os.path.normpath(full_show_path))
# use the whole path if it's given, or else append the show name to the root dir to get the full show path
- if fullShowPath:
- show_dir = os.path.normpath(fullShowPath)
+ if full_show_path:
+ show_dir = os.path.normpath(full_show_path)
extra_check_dir = show_dir
else:
folder_name = show_name
- s = sickchill.indexer.series_by_id(indexerid=indexer_id, indexer=indexer, language=indexerLang)
+ s = sickchill.indexer.series_by_id(indexerid=indexer_id, indexer=indexer, language=indexer_language)
if settings.ADD_SHOWS_WITH_YEAR and s.firstAired:
try:
year = "({0})".format(dateutil.parser.parse(s.firstAired).year)
@@ -521,18 +506,18 @@ def finishAddShow():
show_dir = os.path.join(rootDir, sanitize_filename(folder_name))
extra_check_dir = os.path.join(rootDir, sanitize_filename(show_name))
- # blanket policy - if the dir exists you should have used "add existing show" numbnuts
- if (os.path.isdir(show_dir) or os.path.isdir(extra_check_dir)) and not fullShowPath:
+ # blanket policy - if the dir exists you should have used "add existing show"
+ if (os.path.isdir(show_dir) or os.path.isdir(extra_check_dir)) and not full_show_path:
ui.notifications.error(_("Unable to add show"), _("Folder {show_dir} exists already").format(show_dir=show_dir))
return self.redirect("/addShows/existingShows/")
# don't create show dir if config says not to
if settings.ADD_SHOWS_WO_DIR:
- logger.info("Skipping initial creation of " + show_dir + " due to config.ini setting")
+ logger.info(f"Skipping initial creation of {show_dir} due to config.ini setting")
else:
dir_exists = helpers.makeDir(show_dir)
if not dir_exists:
- logger.exception("Unable to create the folder " + show_dir + ", can't add the show")
+ logger.exception(f"Unable to create the folder {show_dir}, can't add the show")
ui.notifications.error(_("Unable to add show"), _("Unable to create the folder {show_dir}, can't add the show").format(show_dir=show_dir))
# Don't redirect to default page because user wants to see the new show
return self.redirect("/home/")
@@ -548,8 +533,12 @@ def finishAddShow():
if whitelist:
whitelist = short_group_names(whitelist)
+ else:
+ whitelist = []
if blacklist:
blacklist = short_group_names(blacklist)
+ else:
+ blacklist = []
if not anyQualities:
anyQualities = []
@@ -569,7 +558,7 @@ def finishAddShow():
default_status=int(defaultStatus),
quality=newQuality,
season_folders=season_folders,
- lang=indexerLang,
+ lang=indexer_language,
subtitles=subtitles,
subtitles_sc_metadata=subtitles_sc_metadata,
anime=anime,
diff --git a/sickchill/views/server_settings.py b/sickchill/views/server_settings.py
index 8dc3f338a7..c0e8da96d8 100644
--- a/sickchill/views/server_settings.py
+++ b/sickchill/views/server_settings.py
@@ -67,13 +67,6 @@ def __init__(self, options=None):
self.server = None
- # video root
- if settings.ROOT_DIRS:
- root_dirs = settings.ROOT_DIRS.split("|")
- self.video_root = root_dirs[int(root_dirs[0])]
- else:
- self.video_root = None
-
# web root
if self.options["web_root"]:
settings.WEB_ROOT = self.options["web_root"] = "/" + self.options["web_root"].strip("/")
@@ -154,9 +147,6 @@ def __init__(self, options=None):
{"path": os.path.join(self.options["data_root"], "fonts")},
name="fonts",
),
- # TODO: WTF is this?
- # url(rf'{self.options["web_root"]}/videos/(.*)', SickChillStaticFileHandler,
- # {"path": self.video_root}, name='videos')
],
)
diff --git a/tests/test_helpers.py b/tests/test_helpers.py
index c61ddbeb29..c164334bef 100644
--- a/tests/test_helpers.py
+++ b/tests/test_helpers.py
@@ -33,8 +33,6 @@
full_sanitizeSceneName
get_show
is_hidden_folder
- real_path
- is_subdirectory
set_up_anidb_connection
makeZip
extractZip
@@ -253,20 +251,6 @@ def test_is_hidden_folder(self):
"""
pass
- def test_real_path(self):
- """
- Test real_path
- """
- assert helpers.real_path("/usr/SickChill/../root/real/path/") == helpers.real_path("/usr/root/real/path/")
-
- def test_is_subdirectory(self):
- """
- Test is_subdirectory
- """
- assert helpers.is_subdirectory(subdir_path="/usr/SickChill/Downloads/Unpack", topdir_path="/usr/SickChill/Downloads")
- assert helpers.is_subdirectory(subdir_path="/usr/SickChill/Downloads/testfile.tst", topdir_path="/usr/SickChill/Downloads/")
- assert not helpers.is_subdirectory(subdir_path="/usr/SickChill/Unpack", topdir_path="/usr/SickChill/Downloads")
-
class HelpersFileTests(unittest.TestCase):
"""