Skip to content

Commit

Permalink
Improve app stopping performance
Browse files Browse the repository at this point in the history
Avoid blocking ui thread, use Fut for some serializations to parallelize
  • Loading branch information
sghpjuikit committed Jan 14, 2025
1 parent 800b188 commit 0c9b4a4
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 60 deletions.
9 changes: 5 additions & 4 deletions src/player/main/sp/it/pl/audio/PlayerState.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import sp.it.pl.layout.feature.PlaylistFeature
import sp.it.pl.main.APP
import sp.it.pl.main.AppError
import sp.it.pl.main.ifErrorNotify
import sp.it.util.async.future.Fut
import sp.it.util.collections.setTo
import sp.it.util.dev.failIfNotFxThread
import sp.it.util.dev.stacktraceAsString
Expand All @@ -34,8 +35,7 @@ class PlayerState {
playback = s.playback.toDomain()
}

@Blocking
fun serialize() {
fun serialize(): Fut<Unit> {
failIfNotFxThread()

val playlistsActive = APP.widgetManager.widgets.findAll(OPEN)
Expand All @@ -45,8 +45,9 @@ class PlayerState {
playlistId = PlaylistManager.active
playlists setTo PlaylistManager.playlists.filter { it.id in playlistsActive }

CoreSerializer.useAtomically {
writeSingleStorage(PlayerStateDB(this@PlayerState))
val db = PlayerStateDB(this@PlayerState)
return CoreSerializer.useAtomically {
writeSingleStorage(db)
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/player/main/sp/it/pl/core/CoreSerializer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import sp.it.pl.audio.tagging.Metadata
import sp.it.pl.main.APP
import sp.it.pl.main.App.Rank.SLAVE
import sp.it.util.async.actor.ActorSe
import sp.it.util.async.future.Fut
import sp.it.util.dev.ThreadSafe
import sp.it.util.dev.printExecutionTime
import sp.it.util.file.div
Expand Down Expand Up @@ -97,7 +98,7 @@ object CoreSerializer: Core {
serializerActor.close()

@ThreadSafe
fun useAtomically(block: CoreSerializer.() -> Unit): Unit =
fun useAtomically(block: CoreSerializer.() -> Unit): Fut<Unit> =
serializerActor(block)

/**
Expand Down
67 changes: 40 additions & 27 deletions src/player/main/sp/it/pl/plugin/PluginManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@ import de.jensd.fx.glyphs.GlyphIcons
import io.github.oshai.kotlinlogging.KotlinLogging
import javafx.beans.value.ObservableValue
import javafx.collections.FXCollections.observableArrayList
import javafx.util.Duration
import kotlin.reflect.KClass
import kotlin.reflect.full.companionObjectInstance
import kotlin.reflect.full.primaryConstructor
import kotlin.reflect.jvm.jvmName
import kotlin.time.measureTime
import sp.it.pl.main.APP
import sp.it.pl.main.App.Rank.MASTER
import sp.it.pl.main.AppEventLog
import sp.it.pl.main.AppSettings
import sp.it.pl.main.run1AppReady
import sp.it.pl.plugin.PluginManager.Events.PluginInstalled
import sp.it.pl.plugin.PluginManager.Events.PluginStarted
import sp.it.pl.plugin.PluginManager.Events.PluginStarting
import sp.it.pl.plugin.PluginManager.Events.PluginStopped
import sp.it.pl.plugin.PluginManager.Events.PluginStopping
import sp.it.util.Locatable
import sp.it.util.access.vn
import sp.it.util.collections.ObservableListRO
Expand Down Expand Up @@ -48,6 +52,7 @@ import sp.it.util.functional.toUnit
import sp.it.util.reactive.Disposer
import sp.it.util.reactive.Subscription
import sp.it.util.reactive.on
import sp.it.util.units.javafx
import sp.it.util.units.version

class PluginManager: GlobalConfigDelegator {
Expand Down Expand Up @@ -103,10 +108,14 @@ class PluginManager: GlobalConfigDelegator {
object Events {
/** At the time the event is invoked, the plugin is not running. */
data class PluginInstalled<T: PluginBase>(val plugin: PluginBox<T>)
/** At the time the event is invoked, the plugin has not started and [PluginBox.plugin] is null. */
data class PluginStarting<T: PluginBase>(val plugin: PluginBox<T>)
/** At the time the event is invoked, the plugin has started and [PluginBox.plugin] is not null. */
data class PluginStarted<T: PluginBase>(val plugin: PluginBox<T>)
data class PluginStarted<T: PluginBase>(val plugin: PluginBox<T>, val stoppedIn: Duration)
/** At the time the event is invoked, the plugin is still running and [PluginBox.plugin] is not null. */
data class PluginStopped<T: PluginBase>(val plugin: PluginBox<T>)
data class PluginStopping<T: PluginBase>(val plugin: PluginBox<T>)
/** At the time the event is invoked, the plugin is not running and [PluginBox.plugin] is null. */
data class PluginStopped<T: PluginBase>(val plugin: PluginBox<T>, val stoppedIn: Duration)
}

}
Expand All @@ -120,14 +129,14 @@ class PluginRef<P: PluginBase>(val type: KClass<P>) {
APP.plugins.isInstalled(type) -> {
Subscription(
APP.actionStream.onEvent<PluginStarted<P>>({ type.isInstance(it.plugin.plugin) }) { block(it.plugin.plugin!!) on disposer },
APP.actionStream.onEvent<PluginStopped<P>>({ type.isInstance(it.plugin.plugin) }) { disposer() },
APP.actionStream.onEvent<PluginStopping<P>>({ type.isInstance(it.plugin.plugin) }) { disposer() },
Subscription { disposer() }
)
}
else -> APP.actionStream.onEvent<PluginInstalled<P>>({ type.isInstance(it.plugin.plugin) }) {
Subscription(
APP.actionStream.onEvent<PluginStarted<P>>({ type.isInstance(it.plugin.plugin) }) { block(it.plugin.plugin!!) on disposer },
APP.actionStream.onEvent<PluginStopped<P>>({ type.isInstance(it.plugin.plugin) }) { disposer() },
APP.actionStream.onEvent<PluginStopping<P>>({ type.isInstance(it.plugin.plugin) }) { disposer() },
Subscription { disposer() }
)
}
Expand All @@ -143,14 +152,14 @@ class PluginRef<P: PluginBase>(val type: KClass<P>) {
APP.plugins.isInstalled(type) -> {
Subscription(
APP.actionStream.onEvent<PluginStarted<P>>({ type.isInstance(it.plugin.plugin) }) { block(it.plugin.plugin!!) on disposer },
APP.actionStream.onEvent<PluginStopped<P>>({ type.isInstance(it.plugin.plugin) }) { disposer() },
APP.actionStream.onEvent<PluginStopping<P>>({ type.isInstance(it.plugin.plugin) }) { disposer() },
Subscription { disposer() }
)
}
else -> APP.actionStream.onEvent<PluginInstalled<P>>({ it.plugin.plugin is P }) {
Subscription(
APP.actionStream.onEvent<PluginStarted<P>>({ type.isInstance(it.plugin.plugin) }) { block(it.plugin.plugin!!) on disposer },
APP.actionStream.onEvent<PluginStopped<P>>({ type.isInstance(it.plugin.plugin) }) { disposer() },
APP.actionStream.onEvent<PluginStopping<P>>({ type.isInstance(it.plugin.plugin) }) { disposer() },
Subscription { disposer() }
)
}
Expand Down Expand Up @@ -250,21 +259,23 @@ class PluginBox<T: PluginBase>(val type: KClass<T>, val isEnabledByDefault: Bool
fun start() {
failIfNotFxThread()
if (isRunning()) return
logger.info { "Starting plugin $type" }

plugin = runTry {
val constructor = type.primaryConstructor?.takeIf { it.parameters.isEmpty() } ?: fail { "Plugin must have a primary no-arg constructor" }
constructor.call().apply {
collectActionsOf(this)
APP.configuration.collect(this)
APP.configuration.rawSet(this)
start()

APP.actionStream(PluginStarting(this@PluginBox))
var time = measureTime {
plugin = runTry {
val constructor = type.primaryConstructor?.takeIf { it.parameters.isEmpty() } ?: fail { "Plugin must have a primary no-arg constructor" }
constructor.call().apply {
collectActionsOf(this)
APP.configuration.collect(this)
APP.configuration.rawSet(this)
start()
}
}.orNull {
logger.error(it) { "Instantiating plugin $type failed" }
}
}.orNull {
logger.error(it) { "Instantiating plugin $type failed" }
}

if (plugin!=null) APP.actionStream(PluginStarted(this@PluginBox))
if (plugin!=null) APP.actionStream(PluginStarted(this@PluginBox, time.javafx))
}

fun isRunning(): Boolean = plugin!=null
Expand All @@ -273,18 +284,20 @@ class PluginBox<T: PluginBase>(val type: KClass<T>, val isEnabledByDefault: Bool
fun stop() {
failIfNotFxThread()
if (!isRunning()) return
logger.info { "Stopping plugin $type" }

APP.actionStream(PluginStopping(this@PluginBox))
plugin?.apply {
APP.actionStream(PluginStopped(this@PluginBox))
APP.configuration.rawAdd(this)
APP.configuration.drop(this)
plugin = null
runTry {
stop()
}.orNull {
logger.error(it) { "Error while stopping plugin $type" }
var time = measureTime {
APP.configuration.rawAdd(this)
APP.configuration.drop(this)
plugin = null
runTry {
stop()
}.orNull {
logger.error(it) { "Error while stopping plugin $type" }
}
}
APP.actionStream(PluginStopped(this@PluginBox, time.javafx))
}
}

Expand Down
23 changes: 14 additions & 9 deletions src/player/main/sp/it/pl/plugin/impl/VoiceAssistant.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import java.io.File
import java.io.IOException
import java.io.InputStream
import java.lang.ProcessBuilder.Redirect.PIPE
import kotlin.concurrent.thread
import kotlin.time.measureTime
import kotlinx.coroutines.invoke
import kotlinx.coroutines.runBlocking
import sp.it.pl.audio.audioInputDeviceNames
Expand Down Expand Up @@ -52,6 +54,7 @@ import sp.it.util.async.future.Fut.Companion.fut
import sp.it.util.async.future.awaitFx
import sp.it.util.async.future.awaitFxOrBlock
import sp.it.util.async.runFX
import sp.it.util.async.runNew
import sp.it.util.async.runOn
import sp.it.util.collections.list.DestructuredList
import sp.it.util.collections.observableList
Expand Down Expand Up @@ -92,6 +95,7 @@ import sp.it.util.conf.valuesUnsealed
import sp.it.util.dev.doNothing
import sp.it.util.dev.fail
import sp.it.util.dev.markUsed
import sp.it.util.dev.printIt
import sp.it.util.file.children
import sp.it.util.file.div
import sp.it.util.file.hasExtension
Expand Down Expand Up @@ -136,6 +140,7 @@ import sp.it.util.text.split2
import sp.it.util.text.splitNoEmpty
import sp.it.util.text.splitTrimmed
import sp.it.util.text.useStrings
import sp.it.util.units.javafx
import sp.it.util.units.seconds
import sp.it.util.units.uuid

Expand Down Expand Up @@ -699,21 +704,21 @@ class VoiceAssistant: PluginBase() {
private fun llmOpenAiServerStopCommandCompute(on: Bool) =
llmOpenAiServerStopCommand.value.takeIf { on && llmEngine.value==LlmEngine.OPENAI }?.replace("\$model", llmOpenAiModel.value)

private fun llmOpenAiServerStart(command: String?) =
private fun llmOpenAiServerStart(command: String?, blockUi: Boolean = true) =
command.net { c ->
runTry {
if (c!=null && c.isNotBlank() && !c.startsWith("!"))
runCommandWithOutput(c).withAppProgress("Start LLM server").awaitFxOrBlock()
runCommandWithOutput(c).withAppProgress("Start LLM server").apply { if (blockUi) awaitFxOrBlock() }
}.ifError {
logger.error(it) { "Failed to run start command=$c" }
}
}

private fun llmOpenAiServerStop(on: Bool) =
private fun llmOpenAiServerStop(on: Bool, blockUi: Boolean = true) =
llmOpenAiServerStopCommandCompute(on).net { c ->
runTry {
if (c!=null && c.isNotBlank() && !c.startsWith("!"))
runCommandWithOutput(c).withAppProgress("Stop LLM server").awaitFxOrBlock()
runCommandWithOutput(c).withAppProgress("Stop LLM server").apply { if (blockUi) awaitFxOrBlock() }
}.ifError {
logger.error(it) { "Failed to run stop command=$c" }
}
Expand Down Expand Up @@ -862,10 +867,10 @@ class VoiceAssistant: PluginBase() {
}

override fun stop() {
isRunning = false
stopSpeechRecognition(true)
writing.closeAndWait()
onClose()
isRunning = false
stopSpeechRecognition(true)
writing.close()
onClose()
}

private fun startSpeechRecognition(runLlmServerCommand: Bool) {
Expand All @@ -876,7 +881,7 @@ class VoiceAssistant: PluginBase() {
private fun stopSpeechRecognition(runLlmServerCommand: Bool) {
write("EXIT")
setup = null
llmOpenAiServerStop(runLlmServerCommand)
llmOpenAiServerStop(runLlmServerCommand, blockUi = false)
}

@IsAction(name = "Restart Voice Assistant", info = "Restarts Voice Assistant python program")
Expand Down
40 changes: 21 additions & 19 deletions src/player/main/sp/it/pl/ui/objects/window/stage/WindowManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -514,43 +514,45 @@ class WindowManager: GlobalSubConfigDelegator(confWindow.name) {
}
}

fun serialize(appCloses: Boolean) {
fun serialize(appCloses: Boolean): Fut<Unit> {
failIfNotFxThread()
isSerializedWithAppClose = !appCloses
serializing.withLock {
// prevent serializing multiple times, in ui-less mode this can overwrite serialized state with empty
if (isSerialized) {
logger.info { "Serializing windows skipped. Already done." }
return
return fut()
}

// make sure directory is accessible
val dir = APP.location.user.layouts.current
if (!isValidatedDirectory(dir)) {
logger.error { "Serializing windows failed. $dir not accessible." }
return
return fut()
}

val filesOld = dir.children().toSet()
val ws = windows.filter { it!==dockWindow?.window && it.layout!=null }
val ws = windows.filter { it!==dockWindow?.window && it.layout!=null }.map { WindowDb(it) }
logger.info { "Serializing ${ws.size} application windows" }

// serialize - for now each window to its own file with .ws extension
val sessionUniqueName = System.currentTimeMillis().toString()
var isError = false
val filesNew = HashSet<File>()
for (i in ws.indices) {
val w = ws[i]
val f = dir/"window_${sessionUniqueName}_$i.ws"
filesNew += f
isError = isError or APP.serializerJson.toJson(WindowDb(w), f).isError
if (isError) break
}
return runVT {
// serialize - for now each window to its own file with .ws extension
val sessionUniqueName = System.currentTimeMillis().toString()
var isError = false
val filesNew = HashSet<File>()
for (i in ws.indices) {
val w = ws[i]
val f = dir/"window_${sessionUniqueName}_$i.ws"
filesNew += f
isError = isError or APP.serializerJson.toJson(w, f).isError
if (isError) break
}

// remove unneeded files, either old or new session will remain
(if (isError) filesNew else filesOld).forEach { it.delete() }
// remove unneeded files, either old or new session will remain
(if (isError) filesNew else filesOld).forEach { it.delete() }

isSerialized = true
isSerialized = true
}
}
}

Expand All @@ -576,7 +578,7 @@ class WindowManager: GlobalSubConfigDelegator(confWindow.name) {
return fut()
}

return runVT<List<WindowDb>> {
return runVT {
logger.info { "Deserializing windows..." }
val dir = APP.location.user.layouts.current
if (isValidatedDirectory(dir)) {
Expand Down

0 comments on commit 0c9b4a4

Please sign in to comment.