Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: AHA Gather API #3795

Open
wants to merge 103 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
103 commits
Select commit Hold shift + click to select a range
933e1aa
WIP: step1: aha:network default
invisig0th Jun 27, 2024
cba0bfe
wip
invisig0th Jun 27, 2024
8697e4a
wip
invisig0th Jun 28, 2024
c9a9a9d
wip
invisig0th Jun 28, 2024
c15ba7c
wip
invisig0th Jun 28, 2024
9587d08
Merge branch 'master' into visi-aha-defnet
invisig0th Jun 28, 2024
9ac5fbf
wip
invisig0th Jun 29, 2024
0f6cdd6
Merge branch 'visi-aha-defnet' of ssh://github.com/vertexproject/syna…
invisig0th Jun 29, 2024
4b69909
wip
invisig0th Jul 1, 2024
f1d1cad
wip
invisig0th Jul 1, 2024
0a52e07
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 5, 2024
9c18de4
wip
invisig0th Jul 5, 2024
a1664e5
wip
invisig0th Jul 5, 2024
df24baa
wip
invisig0th Jul 5, 2024
b5a543e
wip
invisig0th Jul 5, 2024
d891c82
wip
invisig0th Jul 5, 2024
f31f1af
wip
invisig0th Jul 5, 2024
42697fa
wip
invisig0th Jul 5, 2024
7e2fb64
wip
invisig0th Jul 5, 2024
1d5e0df
wip
invisig0th Jul 5, 2024
ff884dc
wip
invisig0th Jul 5, 2024
562cea7
wip
invisig0th Jul 5, 2024
7e59310
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 5, 2024
0fc3257
wip
invisig0th Jul 5, 2024
27cfd9f
improve coverage
invisig0th Jul 5, 2024
b9bb3f0
wip
invisig0th Jul 6, 2024
92b498d
wip
invisig0th Jul 6, 2024
40c5e04
wip
invisig0th Jul 7, 2024
2c063ed
wip
invisig0th Jul 7, 2024
08d3534
wip
invisig0th Jul 7, 2024
9750935
wip
invisig0th Jul 8, 2024
4f13a21
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 8, 2024
2ee4285
duh
invisig0th Jul 8, 2024
1e715c9
wip
invisig0th Jul 8, 2024
b01d3bf
speed up tests
invisig0th Jul 8, 2024
15a57d9
AHA Gather APIs
invisig0th Jul 8, 2024
e18708e
wip
invisig0th Jul 8, 2024
be760d2
Update docs/synapse/deploymentguide.rst
invisig0th Jul 9, 2024
f0f8a4f
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 9, 2024
36acd8d
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 9, 2024
a7c5e3d
wip
invisig0th Jul 9, 2024
90b7959
wip
invisig0th Jul 9, 2024
25af8b6
wip
invisig0th Jul 9, 2024
e4dea7b
make timeout message a bit prettier
invisig0th Jul 10, 2024
c7f1973
Update synapse/lib/aha.py
invisig0th Jul 10, 2024
f62ee42
Do container branch build
vEpiphyte Jul 10, 2024
5954fa4
Update docker test script for required value.
vEpiphyte Jul 12, 2024
6b0f3a9
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 12, 2024
c7145e3
wip
invisig0th Jul 12, 2024
de05d16
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 15, 2024
f2c4fae
wip
invisig0th Jul 15, 2024
0c55fd9
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 15, 2024
3badfdc
wip
invisig0th Jul 15, 2024
4533440
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 15, 2024
cbc39e3
Fix fstrings
vEpiphyte Jul 15, 2024
020f7a1
Merge branch 'master' into visi-aha-defnet
vEpiphyte Jul 15, 2024
7fb5d58
Fix fstring for clone; add --only-url
vEpiphyte Jul 15, 2024
0211b3e
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 17, 2024
830e3de
Merge branch 'visi-aha-defnet' into visi-aha-gather
invisig0th Jul 17, 2024
dc88406
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 17, 2024
760331e
wip
invisig0th Jul 17, 2024
e9e4294
Merge branch 'visi-aha-defnet' of ssh://github.com/vertexproject/syna…
invisig0th Jul 17, 2024
9337d83
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 22, 2024
efa7d02
wip
invisig0th Jul 23, 2024
03b4b71
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 24, 2024
fe2e583
wip
invisig0th Jul 24, 2024
2f06751
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 25, 2024
95774e6
Merge branch 'master' into visi-aha-defnet
vEpiphyte Jul 26, 2024
21ab327
Visi aha defnet epiphyte (#3823)
vEpiphyte Jul 26, 2024
dc286f4
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 29, 2024
0dd367b
Merge branch 'master' into visi-aha-defnet
vEpiphyte Jul 30, 2024
7a91deb
wip
invisig0th Jul 30, 2024
728da28
wip
invisig0th Jul 30, 2024
37983b2
wip
invisig0th Jul 30, 2024
8d07930
Remove logger statement about username hinting on aha lookup
vEpiphyte Jul 31, 2024
891fc22
Restore ahaname logging during promote/handoff API calls when it is a…
vEpiphyte Jul 31, 2024
a2d5d0b
Tweak flat network docs slightly
vEpiphyte Jul 31, 2024
252599d
changelog entries
invisig0th Jul 31, 2024
e379042
Merge branch 'visi-aha-defnet' of ssh://github.com/vertexproject/syna…
invisig0th Jul 31, 2024
36d8054
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 31, 2024
26c5127
Update changes/6ebc22454e67c26ce57ea4533441c9fc.yaml
invisig0th Jul 31, 2024
55d8394
Update docs/synapse/deploymentguide.rst
invisig0th Jul 31, 2024
7da0c6e
Update docs/synapse/deploymentguide.rst
invisig0th Jul 31, 2024
e9af9a3
Update synapse/lib/cell.py
invisig0th Jul 31, 2024
502d23d
wip
invisig0th Aug 1, 2024
6368eb1
Merge branch 'master' into visi-aha-defnet
invisig0th Aug 1, 2024
37a7a85
Update docs/synapse/deploymentguide.rst
invisig0th Aug 2, 2024
650a891
Merge branch 'master' into visi-aha-defnet
invisig0th Aug 5, 2024
221c823
wip
invisig0th Aug 5, 2024
f3f3ae4
merge from visi-aha-defnet
invisig0th Aug 6, 2024
ca65c6e
merge from master
invisig0th Aug 6, 2024
23a9463
wip
invisig0th Aug 7, 2024
2969523
Merge branch 'master' into visi-aha-gather
invisig0th Aug 20, 2024
301813e
wip
invisig0th Aug 20, 2024
fb6a92f
wip
invisig0th Aug 21, 2024
a4b5ecb
Merge branch 'master' into visi-aha-gather
invisig0th Aug 23, 2024
e6248eb
wip
invisig0th Aug 23, 2024
e0212f8
Merge branch 'master' into visi-aha-gather
invisig0th Aug 26, 2024
2af006b
wip
invisig0th Aug 26, 2024
2179bea
Merge branch 'master' into visi-aha-gather
invisig0th Sep 5, 2024
c9f3db3
wip
invisig0th Sep 5, 2024
770a980
wip
invisig0th Sep 10, 2024
59c6ffe
Merge branch 'master' into visi-aha-gather
invisig0th Nov 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions synapse/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,30 @@ async def wait_for(fut, timeout):
async with _timeout(timeout):
return await fut

async def waitretn(futu, timeout):
try:
valu = await wait_for(futu, timeout)
return (True, valu)
except Exception as e:
return (False, excinfo(e))

async def waitgenr(genr, timeout):

genr = genr.__aiter__()
try:
while True:
retn = await waitretn(genr.__anext__(), timeout)

if not retn[0] and retn[1]['err'] == 'StopAsyncIteration':
return

yield retn

if not retn[0]:
return
finally:
await genr.aclose()

def _release_waiter(waiter, *args):
if not waiter.done():
waiter.set_result(None)
Expand Down
1 change: 1 addition & 0 deletions synapse/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ async def sessfini():
link.set('sess', sess)

if isinstance(item, s_telepath.Aware):
reply[1]['features'] = await item.getTeleFeats()
item = await s_coro.ornot(item.getTeleApi, link, mesg, path)
if isinstance(item, s_base.Base):
link.onfini(item)
Expand Down
203 changes: 202 additions & 1 deletion synapse/lib/aha.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import random
import asyncio
import logging
import contextlib
import collections

import cryptography.x509 as c_x509
Expand Down Expand Up @@ -136,6 +137,35 @@ async def getAhaUrls(self, user='root'):
return ()
return ahaurls

@s_cell.adminapi()
async def runGatherCall(self, iden, todo, timeout=None, skiprun=None):
async for item in self.cell.runGatherCall(iden, todo, timeout=timeout, skiprun=skiprun):
yield item

@s_cell.adminapi()
async def runGatherGenr(self, iden, todo, timeout=None, skiprun=None):
async for item in self.cell.runGatherGenr(iden, todo, timeout=timeout, skiprun=skiprun):
yield item

async def addNexsTracker(self, name, iden, network=None):
'''
Add an explicit nexus tracker which consumes the transactions
produced by the iden leader.
'''
return await self.cell.addNexsTracker(name, iden, network=network)

async def delNexsTracker(self, name, iden, network=None):
'''
Remove a nexus tracker entry.
'''
return await self.cell.delNexsTracker(name, iden, network=network)

async def getNexsTrackers(self, iden):
# services which have the same iden are all automatically trackers
# explicitly registered trackers are also yielded here
async for item in self.cell.getNexsTrackers(iden):
yield item

async def getAhaSvc(self, name, filters=None):
'''
Return an AHA service description dictionary for a service name.
Expand Down Expand Up @@ -366,6 +396,10 @@ async def clearAhaClones(self):
'''
return await self.cell.clearAhaClones()

@s_cell.adminapi()
async def getAhaSvcPeerTasks(self, iden, timeout=None, skiprun=None):
async for task in self.cell.getAhaSvcPeerTasks(iden, timeout=timeout, skiprun=skiprun):
yield task

class ProvDmon(s_daemon.Daemon):

Expand Down Expand Up @@ -571,7 +605,6 @@ async def _initCellBoot(self):

async def initServiceStorage(self):

# TODO plumb using a remote jsonstor?
dirn = s_common.gendir(self.dirn, 'slabs', 'jsonstor')

slab = await s_lmdbslab.Slab.anit(dirn)
Expand Down Expand Up @@ -682,8 +715,135 @@ def _initCellHttpApis(self):
self.addHttpApi('/api/v1/aha/services', AhaServicesV1, {'cell': self})
self.addHttpApi('/api/v1/aha/provision/service', AhaProvisionServiceV1, {'cell': self})

async def callAhaSvcApi(self, name, todo, timeout=None):
name = self._getAhaName(name)
svcdef = await self._getAhaSvc(name)
return self._callAhaSvcApi(svcdef, todo, timeout=timeout)

async def _callAhaSvcApi(self, svcdef, todo, timeout=None):
try:
proxy = await self.getAhaSvcProxy(svcdef, timeout=timeout)
meth = getattr(proxy, todo[0])
return await s_common.waitretn(meth(*todo[1], **todo[2]), timeout=timeout)
except Exception as e:
# in case proxy construction fails
return (False, s_common.excinfo(e))

async def callAhaSvcGenr(self, name, todo, timeout=None):
name = self._getAhaName(name)
svcdef = await self._getAhaSvc(name)

async def _callAhaSvcGenr(self, svcdef, todo, timeout=None):
try:
proxy = await self.getAhaSvcProxy(svcdef, timeout=timeout)
meth = getattr(proxy, todo[0])
async for item in s_common.waitgenr(meth(*todo[1], **todo[2]), timeout=timeout):
yield item
except Exception as e:
# in case proxy construction fails
yield (False, s_common.excinfo(e))

async def getAhaSvcPeerTasks(self, iden, timeout=None, skiprun=None):
todo = s_common.todo('getTasks')
async for item in self.runGatherGenr(iden, todo, timeout=timeout, skiprun=skiprun):
yield item

async def getAhaSvcsByIden(self, iden, online=True, skiprun=None):

runs = set()
async for svcdef in self.getAhaSvcs():
await asyncio.sleep(0)

# TODO services by iden indexes!
if svcdef['svcinfo'].get('iden') != iden:
continue

if online and svcdef['svcinfo'].get('online') is None:
continue

svcrun = svcdef['svcinfo'].get('run')
if svcrun in runs:
continue

if skiprun == svcrun:
continue

runs.add(svcrun)
yield svcdef

def getAhaSvcUrl(self, svcdef, user='root'):
svcfull = svcdef.get('name')
svcnetw = svcdef.get('svcnetw')
host = svcdef['svcinfo']['urlinfo']['host']
port = svcdef['svcinfo']['urlinfo']['port']
return f'ssl://{host}:{port}?hostname={svcfull}&certname={user}@{svcnetw}'

async def runGatherCall(self, iden, todo, timeout=None, skiprun=None):

if not self.isactive:
proxy = await self.nexsroot.client.proxy(timeout=timeout)
async for item in proxy.runGatherCall(iden, todo, timeout=timeout, skiprun=skiprun):
yield item

queue = asyncio.Queue()
async with await s_base.Base.anit() as base:

async def call(svcdef):
svcfull = svcdef.get('name')
await queue.put((svcfull, await self._callAhaSvcApi(svcdef, todo, timeout=timeout)))

count = 0
async for svcdef in self.getAhaSvcsByIden(iden, skiprun=skiprun):
count += 1
base.schedCoro(call(svcdef))

for i in range(count):
yield await queue.get()

async def runGatherGenr(self, iden, todo, timeout=None, skiprun=None):

if not self.isactive:
proxy = await self.nexsroot.client.proxy(timeout=timeout)
async for item in proxy.runGatherGenr(iden, todo, timeout=timeout, skiprun=skiprun):
yield item

queue = asyncio.Queue()
async with await s_base.Base.anit() as base:

async def call(svcdef):
svcfull = svcdef.get('name')
try:
async for item in self._callAhaSvcGenr(svcdef, todo, timeout=timeout):
await queue.put((svcfull, item))
finally:
await queue.put(None)

count = 0
async for svcdef in self.getAhaSvcsByIden(iden, skiprun=skiprun):
count += 1
base.schedCoro(call(svcdef))

while count > 0:

item = await queue.get()
if item is None:
count -= 1
continue

yield item

async def _finiSvcClients(self):
for client in list(self.clients.values()):
await client.fini()

async def initServicePassive(self):
await self._finiSvcClients()

async def initServiceRuntime(self):

self.clients = {}
self.onfini(self._finiSvcClients)

self.addActiveCoro(self._clearInactiveSessions)

if self.isactive:
Expand Down Expand Up @@ -897,6 +1057,43 @@ async def addAhaSvc(self, name, info, network=None):
await self.fire('aha:svcadd', svcinfo=svcinfo)
await self.fire(f'aha:svcadd:{svcfull}', svcinfo=svcinfo)

async def reqAhaSvcProxy(self, svcdef, timeout=None):

proxy = await self.getAhaSvcProxy(svcdef, timeout=timeout)
if proxy is not None:
return proxy

mesg = f'The service is not ready {svcfull}.'
raise s_exc.NotReady(mesg=mesg)

async def getAhaSvcProxy(self, svcdef, timeout=None):

assert self.isactive
client = await self.getAhaSvcClient(svcdef)
if client is None:
return None

return await client.proxy(timeout=timeout)

async def getAhaSvcClient(self, svcdef):

assert self.isactive

svcfull = svcdef.get('name')

client = self.clients.get(svcfull)
if client is not None:
return client

svcurl = self.getAhaSvcUrl(svcdef)

client = self.clients[svcfull] = await s_telepath.ClientV2.anit(svcurl)
async def fini():
self.clients.pop(svcfull, None)

client.onfini(fini)
return client

def _getAhaName(self, name):
# the modern version of names is absolute or ...
if name.endswith('...'):
Expand Down Expand Up @@ -1066,6 +1263,10 @@ async def _setAhaSvcDown(self, name, linkiden, network=None):
logger.info(f'Set [{svcfull}] offline.',
extra=await self.getLogExtra(name=svcname, netw=svcnetw))

client = self.clients.pop(svcfull, None)
if client is not None:
await client.fini()

async def getAhaSvc(self, name, filters=None):

name = self._getAhaName(name)
Expand Down
Loading
Loading