Skip to content

Commit

Permalink
Fix updater taking a long time (#696)
Browse files Browse the repository at this point in the history
Replaces subprocess with asyncio.subprocess in some localplatformlinux functions and improves shutdown handling
Co-authored-by: AAGaming <[email protected]>
  • Loading branch information
suchmememanyskill authored Sep 1, 2024
1 parent c1f7ca7 commit a6e4bcf
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 31 deletions.
42 changes: 28 additions & 14 deletions backend/decky_loader/localplatform/localplatformlinux.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
from re import compile
from asyncio import Lock
from asyncio import Lock, create_subprocess_exec
from asyncio.subprocess import PIPE, DEVNULL, STDOUT, Process
from subprocess import call as call_sync
import os, pwd, grp, sys, logging
from subprocess import call, run, DEVNULL, PIPE, STDOUT
from typing import IO, Any, Mapping
from ..enums import UserType

logger = logging.getLogger("localplatform")

# subprocess._ENV
ENV = Mapping[str, str]
ProcessIO = int | IO[Any] | None
async def run(args: list[str], stdin: ProcessIO = DEVNULL, stdout: ProcessIO = PIPE, stderr: ProcessIO = PIPE, env: ENV | None = None) -> tuple[Process, bytes | None, bytes | None]:
proc = await create_subprocess_exec(args[0], *(args[1:]), stdin=stdin, stdout=stdout, stderr=stderr, env=env)
proc_stdout, proc_stderr = await proc.communicate()
return (proc, proc_stdout, proc_stderr)

# Get the user id hosting the plugin loader
def _get_user_id() -> int:
return pwd.getpwnam(_get_user()).pw_uid
Expand Down Expand Up @@ -54,7 +64,7 @@ def chown(path : str, user : UserType = UserType.HOST_USER, recursive : bool =
else:
raise Exception("Unknown User Type")

result = call(["chown", "-R", user_str, path] if recursive else ["chown", user_str, path])
result = call_sync(["chown", "-R", user_str, path] if recursive else ["chown", user_str, path])
return result == 0

def chmod(path : str, permissions : int, recursive : bool = True) -> bool:
Expand Down Expand Up @@ -131,13 +141,17 @@ def setuid(user : UserType = UserType.HOST_USER):
os.setuid(user_id)

async def service_active(service_name : str) -> bool:
res = run(["systemctl", "is-active", service_name], stdout=DEVNULL, stderr=DEVNULL)
res, _, _ = await run(["systemctl", "is-active", service_name], stdout=DEVNULL, stderr=DEVNULL)
return res.returncode == 0

async def service_restart(service_name : str) -> bool:
call(["systemctl", "daemon-reload"])
async def service_restart(service_name : str, block : bool = True) -> bool:
await run(["systemctl", "daemon-reload"])
cmd = ["systemctl", "restart", service_name]
res = run(cmd, stdout=PIPE, stderr=STDOUT)

if not block:
cmd.append("--no-block")

res, _, _ = await run(cmd, stdout=PIPE, stderr=STDOUT)
return res.returncode == 0

async def service_stop(service_name : str) -> bool:
Expand All @@ -146,7 +160,7 @@ async def service_stop(service_name : str) -> bool:
return True

cmd = ["systemctl", "stop", service_name]
res = run(cmd, stdout=PIPE, stderr=STDOUT)
res, _, _ = await run(cmd, stdout=PIPE, stderr=STDOUT)
return res.returncode == 0

async def service_start(service_name : str) -> bool:
Expand All @@ -155,13 +169,13 @@ async def service_start(service_name : str) -> bool:
return True

cmd = ["systemctl", "start", service_name]
res = run(cmd, stdout=PIPE, stderr=STDOUT)
res, _, _ = await run(cmd, stdout=PIPE, stderr=STDOUT)
return res.returncode == 0

async def restart_webhelper() -> bool:
logger.info("Restarting steamwebhelper")
# TODO move to pkill
res = run(["killall", "-s", "SIGTERM", "steamwebhelper"], stdout=DEVNULL, stderr=DEVNULL)
res, _, _ = await run(["killall", "-s", "SIGTERM", "steamwebhelper"], stdout=DEVNULL, stderr=DEVNULL)
return res.returncode == 0

def get_privileged_path() -> str:
Expand Down Expand Up @@ -241,12 +255,12 @@ async def close_cef_socket():
logger.warning("Can't close CEF socket as Decky isn't running as root.")
return
# Look for anything listening TCP on port 8080
lsof = run(["lsof", "-F", "-iTCP:8080", "-sTCP:LISTEN"], capture_output=True, text=True)
if lsof.returncode != 0 or len(lsof.stdout) < 1:
lsof, stdout, _ = await run(["lsof", "-F", "-iTCP:8080", "-sTCP:LISTEN"], stdout=PIPE)
if not stdout or lsof.returncode != 0 or len(stdout) < 1:
logger.error(f"lsof call failed in close_cef_socket! return code: {str(lsof.returncode)}")
return

lsof_data = cef_socket_lsof_regex.match(lsof.stdout)
lsof_data = cef_socket_lsof_regex.match(stdout.decode())

if not lsof_data:
logger.error("lsof regex match failed in close_cef_socket!")
Expand All @@ -258,7 +272,7 @@ async def close_cef_socket():
logger.info(f"Closing CEF socket with PID {pid} and FD {fd}")

# Use gdb to inject a close() call for the socket fd into steamwebhelper
gdb_ret = run(["gdb", "--nx", "-p", pid, "--batch", "--eval-command", f"call (int)close({fd})"], env={"LD_LIBRARY_PATH": ""})
gdb_ret, _, _ = await run(["gdb", "--nx", "-p", pid, "--batch", "--eval-command", f"call (int)close({fd})"], env={"LD_LIBRARY_PATH": ""})

if gdb_ret.returncode != 0:
logger.error(f"Failed to close CEF socket with gdb! return code: {str(gdb_ret.returncode)}", exc_info=True)
Expand Down
2 changes: 1 addition & 1 deletion backend/decky_loader/localplatform/localplatformwin.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async def service_stop(service_name : str) -> bool:
async def service_start(service_name : str) -> bool:
return True # Stubbed

async def service_restart(service_name : str) -> bool:
async def service_restart(service_name : str, block : bool = True) -> bool:
if service_name == "plugin_loader":
sys.exit(42)

Expand Down
7 changes: 4 additions & 3 deletions backend/decky_loader/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,17 @@ async def shutdown(self, _: Application):
tasks = all_tasks()
current = current_task()
async def cancel_task(task: Task[Any]):
logger.debug(f"Cancelling task {task}")
name = task.get_coro().__qualname__
logger.debug(f"Cancelling task {name}")
try:
task.cancel()
try:
await task
except CancelledError:
pass
logger.debug(f"Task {task} finished")
logger.debug(f"Task {name} finished")
except:
logger.warning(f"Failed to cancel task {task}:\n" + format_exc())
logger.warning(f"Failed to cancel task {name}:\n" + format_exc())
pass
if current:
tasks.remove(current)
Expand Down
33 changes: 22 additions & 11 deletions backend/decky_loader/plugin/sandboxed_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
from json import dumps, loads
from logging import getLogger
from traceback import format_exc
from asyncio import (get_event_loop, new_event_loop,
from asyncio import (ensure_future, get_event_loop, new_event_loop,
set_event_loop)
from signal import SIGINT, SIGTERM
from setproctitle import setproctitle, setthreadtitle

from .messages import SocketResponseDict, SocketMessageType
Expand Down Expand Up @@ -38,6 +39,7 @@ def __init__(self,
self.version = version
self.author = author
self.api_version = api_version
self.shutdown_running = False

self.log = getLogger("sandboxed_plugin")

Expand All @@ -48,7 +50,11 @@ def initialize(self, socket: LocalSocket):
setproctitle(f"{self.name} ({self.file})")
setthreadtitle(self.name)

set_event_loop(new_event_loop())
loop = new_event_loop()
set_event_loop(loop)
# When running Decky manually in a terminal, ctrl-c will trigger this, so we have to handle it properly
loop.add_signal_handler(SIGINT, lambda: ensure_future(self.shutdown()))
loop.add_signal_handler(SIGTERM, lambda: ensure_future(self.shutdown()))
if self.passive:
return
setgid(UserType.ROOT if "root" in self.flags else UserType.HOST_USER)
Expand Down Expand Up @@ -155,22 +161,27 @@ async def _uninstall(self):
self.log.error("Failed to uninstall " + self.name + "!\n" + format_exc())
pass

async def on_new_message(self, message : str) -> str|None:
data = loads(message)

if "stop" in data:
async def shutdown(self, uninstall: bool = False):
if not self.shutdown_running:
self.shutdown_running = True
self.log.info(f"Calling Loader unload function for {self.name}.")
await self._unload()

if data.get('uninstall'):
if uninstall:
self.log.info("Calling Loader uninstall function.")
await self._uninstall()

self.log.debug("Stopping event loop")
self.log.debug("Stopping event loop")

loop = get_event_loop()
loop.call_soon_threadsafe(loop.stop)
sys.exit(0)
loop = get_event_loop()
loop.call_soon_threadsafe(loop.stop)
sys.exit(0)

async def on_new_message(self, message : str) -> str|None:
data = loads(message)

if "stop" in data:
await self.shutdown(data.get('uninstall'))

d: SocketResponseDict = {"type": SocketMessageType.RESPONSE, "res": None, "success": True, "id": data["id"]}
try:
Expand Down
7 changes: 5 additions & 2 deletions backend/decky_loader/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

class RemoteVerAsset(TypedDict):
name: str
size: int
browser_download_url: str
class RemoteVer(TypedDict):
tag_name: str
Expand Down Expand Up @@ -198,11 +199,13 @@ async def do_update(self):

version = self.remoteVer["tag_name"]
download_url = None
size_in_bytes = None
download_filename = "PluginLoader" if ON_LINUX else "PluginLoader.exe"

for x in self.remoteVer["assets"]:
if x["name"] == download_filename:
download_url = x["browser_download_url"]
size_in_bytes = x["size"]
break

if download_url == None:
Expand Down Expand Up @@ -238,10 +241,10 @@ async def do_update(self):
os.mkdir(path.join(getcwd(), ".systemd"))
shutil.move(service_file_path, path.join(getcwd(), ".systemd")+"/plugin_loader.service")

await self.download_decky_binary(download_url, version)
await self.download_decky_binary(download_url, version, size_in_bytes=size_in_bytes)

async def do_restart(self):
await service_restart("plugin_loader")
await service_restart("plugin_loader", block=False)

async def do_shutdown(self):
await service_stop("plugin_loader")
Expand Down

0 comments on commit a6e4bcf

Please sign in to comment.