Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: cell resource monitoring #3822

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions synapse/lib/cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import synapse.lib.config as s_config
import synapse.lib.health as s_health
import synapse.lib.output as s_output
import synapse.lib.resmon as s_resmon
import synapse.lib.certdir as s_certdir
import synapse.lib.dyndeps as s_dyndeps
import synapse.lib.httpapi as s_httpapi
Expand Down Expand Up @@ -841,6 +842,11 @@ async def getReloadableSystems(self):
async def reload(self, subsystem=None):
return await self.cell.reload(subsystem=subsystem)

@adminapi()
async def getResourceProbes(self, mint=0, maxt=0xffffffffffffffff, wait=False):
async for item in self.cell.getResourceProbes(mint=mint, maxt=maxt, wait=wait):
yield item

class Cell(s_nexus.Pusher, s_telepath.Aware):
'''
A Cell() implements a synapse micro-service.
Expand Down Expand Up @@ -1124,6 +1130,8 @@ async def __anit__(self, dirn, conf=None, readonly=False, parent=None):

await self._initCellBoot()

await self.initResourceMonitor()

# we need to know this pretty early...
self.ahasvcname = None
ahaname = self.conf.get('aha:name')
Expand Down Expand Up @@ -1281,6 +1289,23 @@ async def __anit__(self, dirn, conf=None, readonly=False, parent=None):
# phase 5 - service networking
await self.initServiceNetwork()

async def initResourceMonitor():
ctx = multiprocessing.get_context('spawn')

def main():
self.resproc = ctx.Process(target=s_resmon.main, args=(self.dirn, self.conf.asDict()))
self.resproc.start()

def wait():
self.resproc.join()
if self.resproc.exitcode:
raise s_exc.SpawnExit(code=proc.exitcode)

await s_coro.executor(main)
self.resclient = s_telepath.Client('unix:///{self.dirn}/resmon')

await self.schedCoro(s_coro.executor(wait))

def getPermDef(self, perm):
perm = tuple(perm)
if self.permlook is None:
Expand Down Expand Up @@ -4523,3 +4548,8 @@ def getCachedSslCtx(self, opts=None, verify=None):

key = tuple(sorted(opts.items()))
return self._sslctx_cache.get(key)

async def getResourceProbes(self, mint=0, maxt=0xffffffffffffffff, wait=False):
proxy = self.resclient.proxy(timeout=3)
async for item in proxy.getResourceProbes(mint=mint, maxt=maxt, wait=wait):
yield item
81 changes: 81 additions & 0 deletions synapse/lib/resmon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import shutil
import asyncio

import synapse.common as s_common
import synapse.daemon as s_daemon

import synapse.lib.base as s_base
import synapse.lib.lmdbslab as s_lmdbslab

class ResourceMonitor(s_base.Base):

sleeptime = 10

async def __anit__(self, dirn, conf):

await s_base.Base.__anit__(self)

self.dirn = dirn
self.conf = conf # a plain dict of full cell conf
self.windows = []
self.backdirn = conf.get('backup:dir')

# a smidge of storage...
path = s_common.genpath(self.dirn, 'slabs', 'resmon.lmdb')
self.slab = s_lmdbslab.Slab.anit(path)
self.slab.initdb('resmon:probes')

# a dash of daemon
self.dmon = await s_daemon.Daemon.anit()
self.dmon.share('*', self)

# a little listen
path = s_common.genpath(self.dirn, 'resmon')
await self.dmon.listen(f'unix://{path}')

# a lot of loops...
self.schedCoro(self.runResMonLoop())

async def runResMonLoop(self):

# TODO consider "roll up" stats for larger windows?
while not self.isfini:
probe = self.getResProbe()
self.slab.put(s_common.int64en(probe[0]), probe, db='resmon:probes')
await self.waitfini(self.sleeptime)

def getResProbe()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def getResProbe()
def getResProbe(self):


disk = shutil.disk_usage(self.dirn)

totalmem = s_thisplat.getTotalMemory()
availmem = s_thisplat.getAvailableMemory()

backuse = (0, 0)
if self.backdirn:
back = shutil.disk_usage(self.backdirn)
backuse = (back.total, back.free)

# use the length of this list as a version
# (designed for space optimization)
return [
s_common.now(),
self.getNexsIndx(),
(totalmem, availmem),
(disk.total, disk.free),
backuse,
# TODO any of these next stats might need pid?
# TODO cpu stats / io throughput for volume
]

async def getResourceProbes(self, mint=0, maxt=0xffffffffffffffff, wait=False):
# TODO yield and then window if wait... ( may require lock )
yield None

def main(dirn, conf)
return asyncio.run(monitor(dirn, conf))

async def monitor(dirn, conf):
# TODO signal handlers for clean shutdown
rmon = await ResourceMonitor.anit(dirn, conf)
await rmon.waitfini()
Loading