Skip to content

Commit

Permalink
use nids in subgraph embeds
Browse files Browse the repository at this point in the history
  • Loading branch information
Cisphyx committed Nov 15, 2024
1 parent 5ef71a4 commit 50ff88b
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 81 deletions.
60 changes: 22 additions & 38 deletions synapse/lib/ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ class SubGraph:
Nodes which were original seeds have path.meta('graph:seed').
All nodes have path.meta('edges') which is a list of (iden, info) tuples.
All nodes have path.meta('edges') which is a list of (nid, info) tuples.
'''

Expand Down Expand Up @@ -434,9 +434,8 @@ async def pivots(self, runt, node, path, existing):
link = {'type': 'prop', 'prop': propname}
yield (pivonode, path.fork(pivonode, link), link)

for iden in existing:
buid = s_common.uhex(iden)
othr = await node.view.getNodeByBuid(buid)
for nid in existing:
othr = await node.view.getNodeByNid(s_common.int64en(nid))
for propname, ndef in othr.getNodeRefs():
if ndef == node.ndef:
yield (othr, path, {'type': 'prop', 'prop': propname, 'reverse': True})
Expand All @@ -463,19 +462,19 @@ async def pivots(self, runt, node, path, existing):
yield (n, p, {'type': 'rules', 'scope': scope, 'index': indx})
indx += 1

async def _edgefallback(self, runt, results, resultidens, node):
async for nid01 in results:
async def _edgefallback(self, runt, results, node):
async for nid1 in results:
await asyncio.sleep(0)
iden01 = resultidens.get(nid01)
intnid1 = s_common.int64un(nid1)

async for verb in node.iterEdgeVerbs(nid01):
async for verb in node.iterEdgeVerbs(nid1):
await asyncio.sleep(0)
yield (iden01, {'type': 'edge', 'verb': verb})
yield (intnid1, {'type': 'edge', 'verb': verb})

# for existing nodes, we need to add n2 -> n1 edges in reverse
async for verb in runt.view.iterEdgeVerbs(nid01, node.nid):
async for verb in runt.view.iterEdgeVerbs(nid1, node.nid):
await asyncio.sleep(0)
yield (iden01, {'type': 'edge', 'verb': verb, 'reverse': True})
yield (intnid1, {'type': 'edge', 'verb': verb, 'reverse': True})

async def run(self, runt, genr):

Expand All @@ -499,20 +498,15 @@ async def run(self, runt, genr):
done = await stack.enter_async_context(await s_spooled.Set.anit(dirn=core.dirn, cell=core))
intodo = await stack.enter_async_context(await s_spooled.Set.anit(dirn=core.dirn, cell=core))
results = await stack.enter_async_context(await s_spooled.Set.anit(dirn=core.dirn, cell=core))
resultsidens = await stack.enter_async_context(await s_spooled.Dict.anit(dirn=core.dirn, cell=core))
revpivs = await stack.enter_async_context(await s_spooled.Dict.anit(dirn=core.dirn, cell=core))

revedge = await stack.enter_async_context(await s_spooled.Dict.anit(dirn=core.dirn, cell=core))
n1delayed = await stack.enter_async_context(await s_spooled.Set.anit(dirn=core.dirn, cell=core))

# load the existing graph as already done
for iden in existing:
nid = runt.view.core.getNidByBuid(s_common.uhex(iden))
if nid is None:
continue

for nid in existing:
nid = s_common.int64en(nid)
await results.add(nid)
await resultsidens.set(nid, iden)

if doedges:
if runt.view.getEdgeCount(nid) > edgelimit:
Expand All @@ -535,10 +529,6 @@ async def run(self, runt, genr):

await revedge.set(n2nid, re)

if not resultsidens.get(n2nid):
n2iden = s_common.ehex(runt.view.core.getBuidByNid(n2nid))
await resultsidens.set(n2nid, n2iden)

async def todogenr():

async for node, path in genr:
Expand Down Expand Up @@ -576,18 +566,18 @@ async def todogenr():
# we must traverse the pivots for the node *regardless* of degrees
# due to needing to tie any leaf nodes to nodes that were already yielded

nodeiden = node.iden()
intnid = s_common.int64un(node.nid)
edges = list(revpivs.get(nid, defv=()))
async for pivn, pivp, pinfo in self.pivots(runt, node, path, existing):

await asyncio.sleep(0)

if results.has(pivn.nid):
edges.append((pivn.iden(), pinfo))
edges.append((s_common.int64un(pivn.nid), pinfo))
else:
pinfo['reverse'] = True
pivedges = revpivs.get(pivn.nid, defv=())
await revpivs.set(pivn.nid, pivedges + ((nodeiden, pinfo),))
await revpivs.set(pivn.nid, pivedges + ((intnid, pinfo),))

# we dont pivot from omitted nodes
if omitted:
Expand All @@ -602,7 +592,7 @@ async def todogenr():
continue

# no need to pivot to existing nodes
if pivn.iden() in existing:
if s_common.int64un(pivn.nid) in existing:
continue

# do we have room to go another degree out?
Expand All @@ -612,13 +602,12 @@ async def todogenr():

if doedges:
await results.add(nid)
await resultsidens.set(nid, nodeiden)

if runt.view.getEdgeCount(nid) > edgelimit:
# The current node in the pipeline has too many edges from it, so it's
# less prohibitive to just check against the graph
await n1delayed.add(nid)
async for e in self._edgefallback(runt, results, resultsidens, node):
async for e in self._edgefallback(runt, results, node):
edges.append(e)

else:
Expand All @@ -636,28 +625,23 @@ async def todogenr():

await revedge.set(n2nid, re)

if not resultsidens.get(n2nid):
n2iden = s_common.ehex(runt.view.core.getBuidByNid(n2nid))
await resultsidens.set(n2nid, n2iden)

if n2nid in results:
n2iden = resultsidens.get(n2nid)
edges.append((n2iden, {'type': 'edge', 'verb': verb}))
edges.append((s_common.int64un(n2nid), {'type': 'edge', 'verb': verb}))

if revedge.has(nid):
for n2nid, verbs in revedge.get(nid).items():
n2iden = resultsidens.get(n2nid)
n2intnid = s_common.int64un(n2nid)

for verb in verbs:
await asyncio.sleep(0)
edges.append((n2iden, {'type': 'edge', 'verb': verb, 'reverse': True}))
edges.append((n2intnid, {'type': 'edge', 'verb': verb, 'reverse': True}))

async for n1nid in n1delayed:
n1iden = resultsidens.get(n1nid)
n1intnid = s_common.int64un(n1nid)

async for verb in runt.view.iterEdgeVerbs(n1nid, nid):
await asyncio.sleep(0)
edges.append((n1iden, {'type': 'edge', 'verb': verb, 'reverse': True}))
edges.append((n1intnid, {'type': 'edge', 'verb': verb, 'reverse': True}))

path.metadata['edges'] = edges
yield node, path
Expand Down
2 changes: 1 addition & 1 deletion synapse/lib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ async def walk(n, p):
embdnode = retn.get(nodepath)
if embdnode is None:
embdnode = retn[nodepath] = {}
embdnode['*'] = s_common.ehex(node.buid)
embdnode['*'] = s_common.int64un(node.nid)

for relp in relprops:
embdnode[relp] = node.get(relp)
Expand Down
6 changes: 2 additions & 4 deletions synapse/lib/storm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2311,12 +2311,10 @@ async def execute(self, genr=None):
async def _joinEmbedStor(self, storage, embeds):
for nodePath, relProps in embeds.items():
await asyncio.sleep(0)
iden = relProps.get('*')
if not iden:
if (nid := relProps.get('*')) is None:
continue

if (nid := self.view.core.getNidByBuid(s_common.uhex(iden))) is None:
continue
nid = s_common.int64en(nid)

stor = await self.view.getStorNodes(nid)
for relProp in relProps.keys():
Expand Down
4 changes: 2 additions & 2 deletions synapse/tests/test_cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -2967,11 +2967,11 @@ async def test_storm_graph(self):
self.len(0, node[1]['path']['edges'])
elif node[0][0] == 'inet:ip':
self.eq(node[1]['path']['edges'], (
('3732000e26dc7b6c0ad44dbb13e28d9401d4f8280f091461f6f6dd9a63c53124', {'type': 'prop', 'prop': 'ip', 'reverse': True}),
(1, {'type': 'prop', 'prop': 'ip', 'reverse': True}),
))
elif node[0] == ('inet:fqdn', 'woot.com'):
self.eq(node[1]['path']['edges'], (
('3732000e26dc7b6c0ad44dbb13e28d9401d4f8280f091461f6f6dd9a63c53124', {'type': 'prop', 'prop': 'fqdn', 'reverse': True}),
(1, {'type': 'prop', 'prop': 'fqdn', 'reverse': True}),
))

async def test_onadd(self):
Expand Down
63 changes: 29 additions & 34 deletions synapse/tests/test_lib_ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -3200,22 +3200,19 @@ async def test_ast_subgraph_2pass(self):
msgs = await core.stormlist('media:news inet:ip', opts={'graph': True})
nodes = [m[1] for m in msgs if m[0] == 'node']
self.len(2, nodes)
self.eq(nodes[1][1]['path']['edges'], (('8f66c747665dc3f16603bb25c78323ede90086d255ac07176a98a579069c4bb6',
{'type': 'edge', 'verb': 'refs', 'reverse': True}),))
self.eq(nodes[1][1]['path']['edges'], ((1, {'type': 'edge', 'verb': 'refs', 'reverse': True}),))

opts = {'graph': {'existing': (news.iden(),)}}
opts = {'graph': {'existing': (s_common.int64un(news.nid),)}}
msgs = await core.stormlist('inet:ip', opts=opts)
nodes = [m[1] for m in msgs if m[0] == 'node']
self.len(1, nodes)
self.eq(nodes[0][1]['path']['edges'], (('8f66c747665dc3f16603bb25c78323ede90086d255ac07176a98a579069c4bb6',
{'type': 'edge', 'verb': 'refs', 'reverse': True}),))
self.eq(nodes[0][1]['path']['edges'], ((1, {'type': 'edge', 'verb': 'refs', 'reverse': True}),))

opts = {'graph': {'existing': (ipv4.iden(),)}}
opts = {'graph': {'existing': (s_common.int64un(ipv4.nid),)}}
msgs = await core.stormlist('media:news', opts=opts)
nodes = [m[1] for m in msgs if m[0] == 'node']
self.len(1, nodes)
self.eq(nodes[0][1]['path']['edges'], (('6ff89ac24110dec0216d5ce85382056ed50f508dbf718764039f061fc190b3c8',
{'type': 'edge', 'verb': 'refs'}),))
self.eq(nodes[0][1]['path']['edges'], ((2, {'type': 'edge', 'verb': 'refs'}),))

msgs = await core.stormlist('media:news inet:ip', opts={'graph': {'maxsize': 1}})
self.len(1, [m[1] for m in msgs if m[0] == 'node'])
Expand Down Expand Up @@ -3243,16 +3240,16 @@ async def test_ast_subgraph_caching(self):
await core.nodes('[test:str=neato +(_selfrefs)> { test:str=neato }]')
self.len(1, neato)

iden = neato[0].iden()
idens = [iden,]
intnid = s_common.int64un(neato[0].nid)
nids = [intnid,]
opts = {
'graph': {
'degrees': None,
'edges': True,
'refs': True,
'existing': idens
'existing': nids
},
'idens': idens
'nids': nids
}

def testedges(msgs):
Expand All @@ -3263,8 +3260,8 @@ def testedges(msgs):
node = m[1]
edges = node[1]['path']['edges']
self.len(1, edges)
edgeiden, edgedata = edges[0]
self.eq(edgeiden, iden)
edgenid, edgedata = edges[0]
self.eq(edgenid, intnid)
self.true(edgedata.get('reverse', False))
self.eq(edgedata['verb'], 'refs')
self.eq(edgedata['type'], 'edge')
Expand All @@ -3281,17 +3278,16 @@ def testedges(msgs):
burrito = await core.nodes('[test:str=burrito <(_awesome)+ { inet:ip }]')
self.len(1, burrito)

iden = burrito[0].iden()
for m in msgs:
if m[0] != 'node':
continue
node = m[1]
idens.append(node[1]['iden'])
nids.append(node[1]['nid'])

opts['graph']['existing'] = idens
opts['idens'] = [ipv4s[0].iden(),]
ipidens = [n.iden() for n in ipv4s]
ipidens.append(neato[0].iden())
opts['graph']['existing'] = nids
opts['nids'] = [s_common.int64un(ipv4s[0].nid),]
ipnids = [s_common.int64un(n.nid) for n in ipv4s]
ipnids.append(s_common.int64un(neato[0].nid))
for limit in limits:
opts['graph']['edgelimit'] = limit
msgs = await core.stormlist('tee { --> * } { <-- * }', opts=opts)
Expand All @@ -3303,8 +3299,8 @@ def testedges(msgs):
self.len(256, edges)

for edge in edges:
edgeiden, edgedata = edge
self.isin(edgeiden, ipidens)
edgenid, edgedata = edge
self.isin(edgenid, ipnids)
self.true(edgedata.get('reverse', False))
self.eq(edgedata['verb'], '_awesome')
self.eq(edgedata['type'], 'edge')
Expand All @@ -3314,17 +3310,17 @@ def testedges(msgs):
self.len(256, edges)
edges = node[1]['path']['edges']
for edge in edges:
edgeiden, edgedata = edge
self.isin(edgeiden, ipidens)
edgenid, edgedata = edge
self.isin(edgenid, ipnids)
self.eq(edgedata['type'], 'edge')
if edgedata['verb'] == '_selfrefs':
self.eq(edgeiden, neato[0].iden())
self.eq(edgenid, s_common.int64un(neato[0].nid))
else:
self.eq(edgedata['verb'], 'refs')
self.false(edgedata.get('reverse', False))

opts['graph'].pop('existing', None)
opts['idens'] = [neato[0].iden(),]
opts['nids'] = [s_common.int64un(neato[0].nid),]
for limit in limits:
opts['graph']['edgelimit'] = limit
msgs = await core.stormlist('tee { --> * } { <-- * }', opts=opts)
Expand All @@ -3341,9 +3337,9 @@ def testedges(msgs):
elif form == 'test:str':
self.len(258, edges)
for e in edges:
self.isin(e[0], ipidens)
self.isin(e[0], ipnids)
self.eq('edge', e[1]['type'])
if e[0] == neato[0].iden():
if e[0] == s_common.int64un(neato[0].nid):
selfrefs += 1
self.eq('_selfrefs', e[1]['verb'])
else:
Expand All @@ -3353,8 +3349,7 @@ def testedges(msgs):
boop = await core.nodes('[test:str=boop +(refs)> {[inet:ip=5.6.7.0/24]}]')
await core.nodes('[test:str=boop <(refs)+ {[inet:ip=4.5.6.0/24]}]')
self.len(1, boop)
boopiden = boop[0].iden()
opts['idens'] = [boopiden,]
opts['nids'] = [s_common.int64un(boop[0].nid),]
for limit in limits:
opts['graph']['edgelimit'] = limit
msgs = await core.stormlist('tee --join { --> * } { <-- * }', opts=opts)
Expand All @@ -3364,24 +3359,24 @@ async def test_ast_subgraph_existing_prop_edges(self):

async with self.getTestCore() as core:
(fn,) = await core.nodes('[ file:bytes=(woot,) :md5=e5a23e8a2c0f98850b1a43b595c08e63 ]')
fiden = fn.iden()
fnid = s_common.int64un(fn.nid)

rules = {
'degrees': None,
'edges': True,
'refs': True,
'existing': [fiden]
'existing': [fnid]
}

nodes = []

async for node in core.view.iterStormPodes(':md5 -> hash:md5', opts={'idens': [fiden], 'graph': rules}):
async for node in core.view.iterStormPodes(':md5 -> hash:md5', opts={'nids': [fnid], 'graph': rules}):
nodes.append(node)

edges = node[1]['path'].get('edges')
self.len(1, edges)
self.eq(edges, [
[fn.iden(), {
[s_common.int64un(fn.nid), {
"type": "prop",
"prop": "md5",
"reverse": True
Expand Down
4 changes: 2 additions & 2 deletions synapse/tests/test_lib_storm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2056,15 +2056,15 @@ async def test_storm_embeds(self):

node = nodes[0]
self.eq('hehe', node[1]['embeds']['asn']['name'])
self.eq('796d67b92a6ffe9b88fa19d115b46ab6712d673a06ae602d41de84b1464782f2', node[1]['embeds']['asn']['*'])
self.eq(1, node[1]['embeds']['asn']['*'])

opts = {'embeds': {'ou:org': {'hq::email': ('user',)}}}
msgs = await core.stormlist('[ ou:org=* :country=* :hq=* ] { -> ps:contact [ :[email protected] ] }', opts=opts)
nodes = [m[1] for m in msgs if m[0] == 'node']
node = nodes[0]

self.eq('visi', node[1]['embeds']['hq::email']['user'])
self.eq('2346d7bed4b0fae05e00a413bbf8716c9e08857eb71a1ecf303b8972823f2899', node[1]['embeds']['hq::email']['*'])
self.eq(6, node[1]['embeds']['hq::email']['*'])

fork = await core.callStorm('return($lib.view.get().fork().iden)')

Expand Down

0 comments on commit 50ff88b

Please sign in to comment.