From a6bb6a94364387fda76fa3c77cf9f5ed27f64f14 Mon Sep 17 00:00:00 2001 From: visi Date: Mon, 22 Jul 2024 17:44:40 -0400 Subject: [PATCH] WIP: cell resource monitoring --- synapse/lib/cell.py | 30 ++++++++++++++++ synapse/lib/resmon.py | 81 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+) create mode 100644 synapse/lib/resmon.py diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index 77eb2946de..dbb274a2a1 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -42,6 +42,7 @@ import synapse.lib.config as s_config import synapse.lib.health as s_health import synapse.lib.output as s_output +import synapse.lib.resmon as s_resmon import synapse.lib.certdir as s_certdir import synapse.lib.dyndeps as s_dyndeps import synapse.lib.httpapi as s_httpapi @@ -841,6 +842,11 @@ async def getReloadableSystems(self): async def reload(self, subsystem=None): return await self.cell.reload(subsystem=subsystem) + @adminapi() + async def getResourceProbes(self, mint=0, maxt=0xffffffffffffffff, wait=False): + async for item in self.cell.getResourceProbes(mint=mint, maxt=maxt, wait=wait): + yield item + class Cell(s_nexus.Pusher, s_telepath.Aware): ''' A Cell() implements a synapse micro-service. @@ -1124,6 +1130,8 @@ async def __anit__(self, dirn, conf=None, readonly=False, parent=None): await self._initCellBoot() + await self.initResourceMonitor() + # we need to know this pretty early... self.ahasvcname = None ahaname = self.conf.get('aha:name') @@ -1281,6 +1289,23 @@ async def __anit__(self, dirn, conf=None, readonly=False, parent=None): # phase 5 - service networking await self.initServiceNetwork() + async def initResourceMonitor(): + ctx = multiprocessing.get_context('spawn') + + def main(): + self.resproc = ctx.Process(target=s_resmon.main, args=(self.dirn, self.conf.asDict())) + self.resproc.start() + + def wait(): + self.resproc.join() + if self.resproc.exitcode: + raise s_exc.SpawnExit(code=proc.exitcode) + + await s_coro.executor(main) + self.resclient = s_telepath.Client('unix:///{self.dirn}/resmon') + + await self.schedCoro(s_coro.executor(wait)) + def getPermDef(self, perm): perm = tuple(perm) if self.permlook is None: @@ -4523,3 +4548,8 @@ def getCachedSslCtx(self, opts=None, verify=None): key = tuple(sorted(opts.items())) return self._sslctx_cache.get(key) + + async def getResourceProbes(self, mint=0, maxt=0xffffffffffffffff, wait=False): + proxy = self.resclient.proxy(timeout=3) + async for item in proxy.getResourceProbes(mint=mint, maxt=maxt, wait=wait): + yield item diff --git a/synapse/lib/resmon.py b/synapse/lib/resmon.py new file mode 100644 index 0000000000..f6d2e72a35 --- /dev/null +++ b/synapse/lib/resmon.py @@ -0,0 +1,81 @@ +import shutil +import asyncio + +import synapse.common as s_common +import synapse.daemon as s_daemon + +import synapse.lib.base as s_base +import synapse.lib.lmdbslab as s_lmdbslab + +class ResourceMonitor(s_base.Base): + + sleeptime = 10 + + async def __anit__(self, dirn, conf): + + await s_base.Base.__anit__(self) + + self.dirn = dirn + self.conf = conf # a plain dict of full cell conf + self.windows = [] + self.backdirn = conf.get('backup:dir') + + # a smidge of storage... + path = s_common.genpath(self.dirn, 'slabs', 'resmon.lmdb') + self.slab = s_lmdbslab.Slab.anit(path) + self.slab.initdb('resmon:probes') + + # a dash of daemon + self.dmon = await s_daemon.Daemon.anit() + self.dmon.share('*', self) + + # a little listen + path = s_common.genpath(self.dirn, 'resmon') + await self.dmon.listen(f'unix://{path}') + + # a lot of loops... + self.schedCoro(self.runResMonLoop()) + + async def runResMonLoop(self): + + # TODO consider "roll up" stats for larger windows? + while not self.isfini: + probe = self.getResProbe() + self.slab.put(s_common.int64en(probe[0]), probe, db='resmon:probes') + await self.waitfini(self.sleeptime) + + def getResProbe() + + disk = shutil.disk_usage(self.dirn) + + totalmem = s_thisplat.getTotalMemory() + availmem = s_thisplat.getAvailableMemory() + + backuse = (0, 0) + if self.backdirn: + back = shutil.disk_usage(self.backdirn) + backuse = (back.total, back.free) + + # use the length of this list as a version + # (designed for space optimization) + return [ + s_common.now(), + self.getNexsIndx(), + (totalmem, availmem), + (disk.total, disk.free), + backuse, + # TODO any of these next stats might need pid? + # TODO cpu stats / io throughput for volume + ] + + async def getResourceProbes(self, mint=0, maxt=0xffffffffffffffff, wait=False): + # TODO yield and then window if wait... ( may require lock ) + yield None + +def main(dirn, conf) + return asyncio.run(monitor(dirn, conf)) + +async def monitor(dirn, conf): + # TODO signal handlers for clean shutdown + rmon = await ResourceMonitor.anit(dirn, conf) + await rmon.waitfini()