Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: storm performance tracing #3821

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 144 additions & 6 deletions synapse/lib/ast.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import types
import asyncio
import decimal
Expand Down Expand Up @@ -183,6 +184,128 @@

class LookList(AstNode): pass

class Trace:

def __init__(self, runt):
self.runt = runt
self.traceopers = []
# TODO how to track edits per oper?
# TODO ensure we can handle recursion
# and make sense of the resulting messages
# TODO decide how to specify / implement recursion

async def wrapgenr(self, oper, genr):
# sudo make me a genr sandwich
pref = TracePreOper(self, oper, genr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this generator sandwich is quite coming out correctly, I think both the pre/post trace operations are happening after the oper is actually run.

post = TracePostOper(self, oper, pref.wrapgenr())
self.traceopers.append((pref, oper, post))
return post.wrapgenr()

def report(self):
retn = []
for (pref, oper, post) in self.traceopers:

stats = pref.stats()
stats.update(post.stats())

retn.append({
'stats': stats,
'posinfo': post.posinfo,
})
return retn

class PathTraceInfo:

def __init__(self):
self.times = []
self.ticks = collections.deque()

def pack(self):
return {'times': self.times}

def fork(self):
info = PathTraceInfo()
info.times.extend(self.times)
info.ticks.extend(self.ticks)
return info

Check warning on line 230 in synapse/lib/ast.py

View check run for this annotation

Codecov / codecov/patch

synapse/lib/ast.py#L227-L230

Added lines #L227 - L230 were not covered by tests

class TracePreOper:

def __init__(self, trace, oper, genr):
self.genr = genr
self.oper = oper
self.trace = trace

self.count = 0
self.inittime = None
self.input = collections.defaultdict(int)

async def wrapgenr(self):

self.inittime = time.monotonic_ns()
async for node, path in self.genr:

tick = time.monotonic_ns()

if path.traceinfo is None:
path.traceinfo = PathTraceInfo()

self.count += 1
self.input[node.form.name] += 1
path.traceinfo.ticks.append(tick)

yield node, path

def stats(self):
return {'time:init': self.inittime, 'input': dict(self.input)}

class TracePostOper:

def __init__(self, trace, oper, genr):
self.genr = genr
self.oper = oper
self.trace = trace
self.posinfo = None

if oper is not None:
self.posinfo = oper.getPosInfo()

self.count = 0
self.finitime = 0
self.lifted = collections.defaultdict(int)
self.output = collections.defaultdict(int)

def stats(self):
return {
'count': self.count,
'lifted': dict(self.lifted),
'output': dict(self.output),
'time:fini': self.finitime,
}

async def wrapgenr(self):

async for node, path in self.genr:

tock = time.monotonic_ns()

self.count += 1
self.output[node.form.name] += 1

if path.traceinfo is None:
# this storm operator added the node to the pipeline
self.lifted[node.form.name] += 1
path.traceinfo = PathTraceInfo()

Check warning on line 298 in synapse/lib/ast.py

View check run for this annotation

Codecov / codecov/patch

synapse/lib/ast.py#L297-L298

Added lines #L297 - L298 were not covered by tests

if path.traceinfo.ticks:
tick = path.traceinfo.ticks.pop()
took = tock - tick
path.traceinfo.times.append((tick, tock, took, self.posinfo))

yield node, path

self.finitime = time.monotonic_ns()

class Query(AstNode):

def __init__(self, astinfo, kids=()):
Expand All @@ -195,13 +318,28 @@

async def run(self, runt, genr):

async with contextlib.AsyncExitStack() as stack:
for oper in self.kids:
genr = await stack.enter_async_context(contextlib.aclosing(oper.run(runt, genr)))
trace = None
if runt.opts.get('trace', False):
trace = Trace(runt)
genr = await trace.wrapgenr(None, genr)

async for node, path in genr:
runt.tick()
yield node, path
# FIXME discuss this change...
# async with contextlib.AsyncExitStack() as stack:
# for oper in self.kids:
# genr = await stack.enter_async_context(contextlib.aclosing(oper.run(runt, genr)))

for oper in self.kids:
genr = await runt.withgenr(oper.run(runt, genr))

if trace is not None:
genr = await trace.wrapgenr(oper, genr)

async for node, path in genr:
runt.tick()
yield node, path

if trace is not None:
await runt.snap.fire('trace', report=trace.report())

async def iterNodePaths(self, runt, genr=None):

Expand Down
3 changes: 3 additions & 0 deletions synapse/lib/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ async def enter_context(self, item):
assert entr is not None
return entr()

async def withgenr(self, genr):
return await self.enter_context(contextlib.aclosing(genr))

def onfini(self, func):
'''
Add a function/coroutine/Base to be called on fini().
Expand Down
9 changes: 7 additions & 2 deletions synapse/lib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,10 +708,11 @@
'''
A path context tracked through the storm runtime.
'''
def __init__(self, vars, nodes):
def __init__(self, vars, nodes, traceinfo=None):

self.node = None
self.nodes = nodes
self.traceinfo = None

if len(nodes):
self.node = nodes[-1]
Expand Down Expand Up @@ -772,7 +773,11 @@
nodes = list(self.nodes)
nodes.append(node)

path = Path(self.vars.copy(), nodes)
traceinfo = None
if self.traceinfo:
traceinfo = self.traceinfo.fork()

Check warning on line 778 in synapse/lib/node.py

View check run for this annotation

Codecov / codecov/patch

synapse/lib/node.py#L778

Added line #L778 was not covered by tests

path = Path(self.vars.copy(), nodes, traceinfo=traceinfo)

return path

Expand Down
3 changes: 3 additions & 0 deletions synapse/lib/snap.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,9 @@ async def iterStormPodes(self, text, opts, user=None):
pode = node.pack(dorepr=dorepr)
pode[1]['path'] = await path.pack(path=dopath)

if path.traceinfo is not None:
pode[1]['trace'] = path.traceinfo.pack()

if show_storage:
pode[1]['storage'] = await node.getStorNodes()

Expand Down
7 changes: 7 additions & 0 deletions synapse/tests/test_lib_storm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4888,3 +4888,10 @@ async def test_lib_dmon_embed(self):
''')

self.none(await core.callStorm('return($lib.queue.gen(haha).get().1)'))

async def test_lib_storm_trace(self):

async with self.getTestCore() as core:
opts = {'trace': True}
msgs = await core.stormlist('[ inet:fqdn=vertex.link ] | sleep 0.1 | [ +#woot ]', opts=opts)
[print(repr(m)) for m in msgs]
Loading