diff --git a/synapse/lib/ast.py b/synapse/lib/ast.py index 23cc4acdd2..f2f0a719c7 100644 --- a/synapse/lib/ast.py +++ b/synapse/lib/ast.py @@ -1,3 +1,4 @@ +import time import types import asyncio import decimal @@ -183,6 +184,128 @@ def hasVarName(self, name): class LookList(AstNode): pass +class Trace: + + def __init__(self, runt): + self.runt = runt + self.traceopers = [] + # TODO how to track edits per oper? + # TODO ensure we can handle recursion + # and make sense of the resulting messages + # TODO decide how to specify / implement recursion + + async def wrapgenr(self, oper, genr): + # sudo make me a genr sandwich + pref = TracePreOper(self, oper, genr) + post = TracePostOper(self, oper, pref.wrapgenr()) + self.traceopers.append((pref, oper, post)) + return post.wrapgenr() + + def report(self): + retn = [] + for (pref, oper, post) in self.traceopers: + + stats = pref.stats() + stats.update(post.stats()) + + retn.append({ + 'stats': stats, + 'posinfo': post.posinfo, + }) + return retn + +class PathTraceInfo: + + def __init__(self): + self.times = [] + self.ticks = collections.deque() + + def pack(self): + return {'times': self.times} + + def fork(self): + info = PathTraceInfo() + info.times.extend(self.times) + info.ticks.extend(self.ticks) + return info + +class TracePreOper: + + def __init__(self, trace, oper, genr): + self.genr = genr + self.oper = oper + self.trace = trace + + self.count = 0 + self.inittime = None + self.input = collections.defaultdict(int) + + async def wrapgenr(self): + + self.inittime = time.monotonic_ns() + async for node, path in self.genr: + + tick = time.monotonic_ns() + + if path.traceinfo is None: + path.traceinfo = PathTraceInfo() + + self.count += 1 + self.input[node.form.name] += 1 + path.traceinfo.ticks.append(tick) + + yield node, path + + def stats(self): + return {'time:init': self.inittime, 'input': dict(self.input)} + +class TracePostOper: + + def __init__(self, trace, oper, genr): + self.genr = genr + self.oper = oper + self.trace = trace + self.posinfo = None + + if oper is not None: + self.posinfo = oper.getPosInfo() + + self.count = 0 + self.finitime = 0 + self.lifted = collections.defaultdict(int) + self.output = collections.defaultdict(int) + + def stats(self): + return { + 'count': self.count, + 'lifted': dict(self.lifted), + 'output': dict(self.output), + 'time:fini': self.finitime, + } + + async def wrapgenr(self): + + async for node, path in self.genr: + + tock = time.monotonic_ns() + + self.count += 1 + self.output[node.form.name] += 1 + + if path.traceinfo is None: + # this storm operator added the node to the pipeline + self.lifted[node.form.name] += 1 + path.traceinfo = PathTraceInfo() + + if path.traceinfo.ticks: + tick = path.traceinfo.ticks.pop() + took = tock - tick + path.traceinfo.times.append((tick, tock, took, self.posinfo)) + + yield node, path + + self.finitime = time.monotonic_ns() + class Query(AstNode): def __init__(self, astinfo, kids=()): @@ -195,13 +318,28 @@ def __init__(self, astinfo, kids=()): async def run(self, runt, genr): - async with contextlib.AsyncExitStack() as stack: - for oper in self.kids: - genr = await stack.enter_async_context(contextlib.aclosing(oper.run(runt, genr))) + trace = None + if runt.opts.get('trace', False): + trace = Trace(runt) + genr = await trace.wrapgenr(None, genr) - async for node, path in genr: - runt.tick() - yield node, path + # FIXME discuss this change... + # async with contextlib.AsyncExitStack() as stack: + # for oper in self.kids: + # genr = await stack.enter_async_context(contextlib.aclosing(oper.run(runt, genr))) + + for oper in self.kids: + genr = await runt.withgenr(oper.run(runt, genr)) + + if trace is not None: + genr = await trace.wrapgenr(oper, genr) + + async for node, path in genr: + runt.tick() + yield node, path + + if trace is not None: + await runt.snap.fire('trace', report=trace.report()) async def iterNodePaths(self, runt, genr=None): diff --git a/synapse/lib/base.py b/synapse/lib/base.py index 1507a11bfe..a5bada0f42 100644 --- a/synapse/lib/base.py +++ b/synapse/lib/base.py @@ -175,6 +175,9 @@ async def enter_context(self, item): assert entr is not None return entr() + async def withgenr(self, genr): + return await self.enter_context(contextlib.aclosing(genr)) + def onfini(self, func): ''' Add a function/coroutine/Base to be called on fini(). diff --git a/synapse/lib/node.py b/synapse/lib/node.py index b14d01ad04..716e4479f9 100644 --- a/synapse/lib/node.py +++ b/synapse/lib/node.py @@ -708,10 +708,11 @@ class Path: ''' A path context tracked through the storm runtime. ''' - def __init__(self, vars, nodes): + def __init__(self, vars, nodes, traceinfo=None): self.node = None self.nodes = nodes + self.traceinfo = None if len(nodes): self.node = nodes[-1] @@ -772,7 +773,11 @@ def fork(self, node): nodes = list(self.nodes) nodes.append(node) - path = Path(self.vars.copy(), nodes) + traceinfo = None + if self.traceinfo: + traceinfo = self.traceinfo.fork() + + path = Path(self.vars.copy(), nodes, traceinfo=traceinfo) return path diff --git a/synapse/lib/snap.py b/synapse/lib/snap.py index 520d28970b..2d9009ccfb 100644 --- a/synapse/lib/snap.py +++ b/synapse/lib/snap.py @@ -757,6 +757,9 @@ async def iterStormPodes(self, text, opts, user=None): pode = node.pack(dorepr=dorepr) pode[1]['path'] = await path.pack(path=dopath) + if path.traceinfo is not None: + pode[1]['trace'] = path.traceinfo.pack() + if show_storage: pode[1]['storage'] = await node.getStorNodes() diff --git a/synapse/tests/test_lib_storm.py b/synapse/tests/test_lib_storm.py index 6ab334a428..225f0d1916 100644 --- a/synapse/tests/test_lib_storm.py +++ b/synapse/tests/test_lib_storm.py @@ -4888,3 +4888,10 @@ async def test_lib_dmon_embed(self): ''') self.none(await core.callStorm('return($lib.queue.gen(haha).get().1)')) + + async def test_lib_storm_trace(self): + + async with self.getTestCore() as core: + opts = {'trace': True} + msgs = await core.stormlist('[ inet:fqdn=vertex.link ] | sleep 0.1 | [ +#woot ]', opts=opts) + [print(repr(m)) for m in msgs]