Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add linting to Python files in CI #365

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,13 @@ jobs:
pushd frontend
npm install
npm run lint

- name: Run black (Python formatting)
uses: lgeiger/[email protected]
with:
args: "./backend --experimental-string-processing --config ./backend/pyproject.toml"

- name: Run ruff (Python linting)
uses: jpetrucciani/ruff-check@main
with:
path: "./backend"
120 changes: 84 additions & 36 deletions backend/browser.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,41 @@
# Full imports
import json

# import pprint
# from pprint import pformat

# Partial imports
from aiohttp import ClientSession, web
from asyncio import get_event_loop, sleep
from concurrent.futures import ProcessPoolExecutor
from aiohttp import ClientSession
from asyncio import sleep
from hashlib import sha256
from io import BytesIO
from logging import getLogger
from os import R_OK, W_OK, path, rename, listdir, access, mkdir
from os import R_OK, W_OK, path, listdir, access, mkdir
from shutil import rmtree
from subprocess import call
from time import time
from zipfile import ZipFile

# Local modules
from helpers import get_ssl_context, get_user, get_user_group, download_remote_binary_to_path
from helpers import (
get_ssl_context,
get_user,
get_user_group,
download_remote_binary_to_path,
)
from injector import get_gamepadui_tab

logger = getLogger("Browser")


class PluginInstallContext:
def __init__(self, artifact, name, version, hash) -> None:
self.artifact = artifact
self.name = name
self.version = version
self.hash = hash


class PluginBrowser:
def __init__(self, plugin_path, plugins, loader) -> None:
self.plugin_path = plugin_path
Expand All @@ -43,49 +50,70 @@ def _unzip_to_plugin_dir(self, zip, name, hash):
zip_file = ZipFile(zip)
zip_file.extractall(self.plugin_path)
plugin_dir = self.find_plugin_folder(name)
code_chown = call(["chown", "-R", get_user()+":"+get_user_group(), plugin_dir])
code_chown = call(
["chown", "-R", get_user() + ":" + get_user_group(), plugin_dir]
)
code_chmod = call(["chmod", "-R", "555", plugin_dir])
if code_chown != 0 or code_chmod != 0:
logger.error(f"chown/chmod exited with a non-zero exit code (chown: {code_chown}, chmod: {code_chmod})")
logger.error(
f"chown/chmod exited with a non-zero exit code (chown: {code_chown},"
f" chmod: {code_chmod})"
)
return False
return True

async def _download_remote_binaries_for_plugin_with_name(self, pluginBasePath):
rv = False
try:
packageJsonPath = path.join(pluginBasePath, 'package.json')
pluginBinPath = path.join(pluginBasePath, 'bin')
packageJsonPath = path.join(pluginBasePath, "package.json")
pluginBinPath = path.join(pluginBasePath, "bin")

if access(packageJsonPath, R_OK):
with open(packageJsonPath, "r", encoding="utf-8") as f:
packageJson = json.load(f)
if "remote_binary" in packageJson and len(packageJson["remote_binary"]) > 0:
if (
"remote_binary" in packageJson
and len(packageJson["remote_binary"]) > 0
):
# create bin directory if needed.
rc=call(["chmod", "-R", "777", pluginBasePath])
call(["chmod", "-R", "777", pluginBasePath])
if access(pluginBasePath, W_OK):

if not path.exists(pluginBinPath):
mkdir(pluginBinPath)

if not access(pluginBinPath, W_OK):
rc=call(["chmod", "-R", "777", pluginBinPath])
call(["chmod", "-R", "777", pluginBinPath])

rv = True
for remoteBinary in packageJson["remote_binary"]:
# Required Fields. If any Remote Binary is missing these fail the install.
binName = remoteBinary["name"]
binURL = remoteBinary["url"]
binHash = remoteBinary["sha256hash"]
if not await download_remote_binary_to_path(binURL, binHash, path.join(pluginBinPath, binName)):
if not await download_remote_binary_to_path(
binURL, binHash, path.join(pluginBinPath, binName)
):
rv = False
raise Exception(f"Error Downloading Remote Binary {binName}@{binURL} with hash {binHash} to {path.join(pluginBinPath, binName)}")

code_chown = call(["chown", "-R", get_user()+":"+get_user_group(), self.plugin_path])
rc=call(["chmod", "-R", "555", pluginBasePath])
raise Exception(
"Error Downloading Remote Binary"
f" {binName}@{binURL} with hash {binHash} to"
f" {path.join(pluginBinPath, binName)}"
)

call(
[
"chown",
"-R",
get_user() + ":" + get_user_group(),
self.plugin_path,
]
)
call(["chmod", "-R", "555", pluginBasePath])
else:
rv = True
logger.debug(f"No Remote Binaries to Download")
logger.debug("No Remote Binaries to Download")

except Exception as e:
rv = False
logger.debug(str(e))
Expand All @@ -95,12 +123,16 @@ async def _download_remote_binaries_for_plugin_with_name(self, pluginBasePath):
def find_plugin_folder(self, name):
for folder in listdir(self.plugin_path):
try:
with open(path.join(self.plugin_path, folder, 'plugin.json'), "r", encoding="utf-8") as f:
with open(
path.join(self.plugin_path, folder, "plugin.json"),
"r",
encoding="utf-8",
) as f:
plugin = json.load(f)

if plugin['name'] == name:
if plugin["name"] == name:
return str(path.join(self.plugin_path, folder))
except:
except Exception:
logger.debug(f"skipping {folder}")

async def uninstall_plugin(self, name):
Expand All @@ -127,8 +159,10 @@ async def uninstall_plugin(self, name):
except FileNotFoundError:
logger.warning(f"Plugin {name} not installed, skipping uninstallation")
except Exception as e:
logger.error(f"Plugin {name} in {self.find_plugin_folder(name)} was not uninstalled")
logger.error(f"Error at %s", exc_info=e)
logger.error(
f"Plugin {name} in {self.find_plugin_folder(name)} was not uninstalled"
)
logger.error("Error at %s", exc_info=e)
if self.loader.watcher:
self.loader.watcher.disabled = False

Expand All @@ -140,8 +174,11 @@ async def _install(self, artifact, name, version, hash):
pluginFolderPath = self.find_plugin_folder(name)
if pluginFolderPath:
isInstalled = True
except:
logger.error(f"Failed to determine if {name} is already installed, continuing anyway.")
except Exception:
logger.error(
f"Failed to determine if {name} is already installed, continuing"
" anyway."
)
logger.info(f"Installing {name} (Version: {version})")
async with ClientSession() as client:
logger.debug(f"Fetching {artifact}")
Expand All @@ -155,22 +192,26 @@ async def _install(self, artifact, name, version, hash):
try:
logger.debug("Uninstalling existing plugin...")
await self.uninstall_plugin(name)
except:
except Exception:
logger.error(f"Plugin {name} could not be uninstalled.")
logger.debug("Unzipping...")
ret = self._unzip_to_plugin_dir(res_zip, name, hash)
if ret:
plugin_dir = self.find_plugin_folder(name)
ret = await self._download_remote_binaries_for_plugin_with_name(plugin_dir)
ret = await self._download_remote_binaries_for_plugin_with_name(
plugin_dir
)
if ret:
logger.info(f"Installed {name} (Version: {version})")
if name in self.loader.plugins:
self.loader.plugins[name].stop()
self.loader.plugins.pop(name, None)
await sleep(1)
self.loader.import_plugin(path.join(plugin_dir, "main.py"), plugin_dir)
self.loader.import_plugin(
path.join(plugin_dir, "main.py"), plugin_dir
)
else:
logger.fatal(f"Failed Downloading Remote Binaries")
logger.fatal("Failed Downloading Remote Binaries")
else:
self.log.fatal(f"SHA-256 Mismatch!!!! {name} (Version: {version})")
if self.loader.watcher:
Expand All @@ -180,14 +221,21 @@ async def _install(self, artifact, name, version, hash):

async def request_plugin_install(self, artifact, name, version, hash):
request_id = str(time())
self.install_requests[request_id] = PluginInstallContext(artifact, name, version, hash)
self.install_requests[request_id] = PluginInstallContext(
artifact, name, version, hash
)
tab = await get_gamepadui_tab()
await tab.open_websocket()
await tab.evaluate_js(f"DeckyPluginLoader.addPluginInstallPrompt('{name}', '{version}', '{request_id}', '{hash}')")
await tab.evaluate_js(
f"DeckyPluginLoader.addPluginInstallPrompt('{name}', '{version}',"
f" '{request_id}', '{hash}')"
)

async def confirm_plugin_install(self, request_id):
request = self.install_requests.pop(request_id)
await self._install(request.artifact, request.name, request.version, request.hash)
await self._install(
request.artifact, request.name, request.version, request.hash
)

def cancel_plugin_install(self, request_id):
self.install_requests.pop(request_id)
Loading