Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Optimize a Slab while it's running #4003

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions synapse/lib/fifofile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import io
import asyncio

import synapse.common as s_common

import synapse.lib.base as s_base
import synapse.lib.const as s_const

class FifoFile(s_base.Base):
'''
Use a file as an async FIFO.
'''
async def __anit__(self):

await s_base.Base.__anit__(self)

self.size = 0
self.offset = 0

self.count = 0
self.yielded = 0

self.lock = asyncio.Lock()
self.readable = asyncio.Event()
self.unpacker = s_msgpack.unpacker()

self.tempdir = self.enter_context(s_common.getTempDir())
self.fifopath = s_common.genpath(self.tempdir, 'fifo.mpk')
self.fifofile = await s_coro.executor(io.open, self.fifopath, 'wb')
self.fifofd = self.fifofile.fileno()

async def fini():
self.readable.set()

self.onfini(fini)

async def put(self, item):
byts = s_msgpack.en(item)
async with self.lock:
self.size += await s_coro.executor(os.pwrite, self.fifofd, self.size)
self.count += 1
self.readable.set()

async def get(self):

while True:

if self.isfini:
raise s_exc.IsFini()

async with self.lock:

while True:

try:
item = self.unpacker.unpack()
self.count -= 1
return item

except msgpack.exceptions.OutOfData:

if self.count == 0:
# TODO if we catch up, truncate the file?
# await s_coro.executor(self.fifofile.truncate)
break

byts = s_coro.executor(self.fifofile.read, s_const.mebibyte)
self.unpacker.feed(byts)

await self.readable.wait()
183 changes: 171 additions & 12 deletions synapse/lib/lmdbslab.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import shutil
import asyncio
import threading
import contextlib
import collections

import logging
Expand All @@ -19,6 +20,7 @@
import synapse.lib.const as s_const
import synapse.lib.nexus as s_nexus
import synapse.lib.msgpack as s_msgpack
import synapse.lib.fifofile as s_fifofile
import synapse.lib.thishost as s_thishost
import synapse.lib.thisplat as s_thisplat
import synapse.lib.slabseqn as s_slabseqn
Expand Down Expand Up @@ -762,6 +764,101 @@ def _mapsizeround(size):

return _roundup(size, MAX_DOUBLE_SIZE)

class Optimizer(s_base.Base):

async def __anit__(self, slab, dirn=None):

await s_base.Base.__anit__(self)

self.slab = slab
self.slab.onfini(self)

self.size = 0
self.count = 0
self.synced = 0

self.fifo = await s_fifofile.FifoFile.anit()

self.optslab = None
self.optpath = f'{self.slab.path}_optimizing'

async def fini():

await self.fifo.fini()
if self.optslab is not None:
await self.optslab.fini()
shutil.rmtree(self.optpath, ignore_errors=True)

self.onfini(fini)

def writeahead(self, xactops):

packops = []
for (func, args, kwargs) in xactops:
indx = self.slab.xactlooks.get(func)
assert indx is not None
packops.append((indx, args, kwargs))

try:
await self.fifo.put(packops)
except Exception as e:
logger.exception('Optimize Failed On')
# TODO: how do we kill the task?
# await self.fini()

async def _backup(self):
# lockstep a new process which opens at the current
# transaction and fires a thread to run the copy
# (confirmed LMDB copy releases the GIL)

s_common.gendir(self.optpath)

# TODO: figure out how to run this in a thread...
with self.slab.lenv.begin(write=False) as txn:
self.slab.lenv.copy(self.optpath, compact=True, txn=txn)

self.optslab = await Slab.anit(self.optpath)

async def _catchup(self):

while True:

if self.fifo.count == 0 and not self.slab.dirty:
return

indx, args, kwargs = await self.fifo.get()
self.slab.xactfuncs[indx](*args, **kwargs)
await asyncio.sleep(0)

async def _switchup(self):

await self.optslab.fini()

[scan.bump() for scan in self.slab.scans]

# how confident are we feeling?
# os.rename(self.slab.path, f'{self.slab.path}_old')
shutil.rmtree(self.slab.path, ignore_errors=True)
os.rename(self.optpath, self.slab.path)

self.slab.reopen()

async def optimize(self):

logger.warning(f'Beginning Optimization: {self.slab.path}')
try:
await self.slab.sync()

await self._backup()
await self._catchup()
await self._switchup()

await self.fini()

except Exception as e:
logger.exception('...optimization failed! Resuming normal operation.')
await self.fini()

class Slab(s_base.Base):
'''
A "monolithic" LMDB instance for use in a asyncio loop thread.
Expand Down Expand Up @@ -790,7 +887,7 @@ def getSlabsInDir(clas, dirn):
if toppath == slab.path or slab.path.startswith(toppath + os.sep)]

@classmethod
async def initSyncLoop(clas, inst):
async def initSyncLoop(clas):

if clas.synctask is not None:
return
Expand Down Expand Up @@ -825,6 +922,20 @@ async def syncLoopOnce(clas):
await slab.sync()
await asyncio.sleep(0)

@contextlib.asynccontextmanager
async def optimizer(self):

if self._optimizer is not None:
raise s_exc.SynErr(mesg='FIXME DUP')

self._optimizer = await Optimizer.anit(self)

yield self._optimizer

await self._optimizer.fini()

self._optimizer = None

@classmethod
async def getSlabStats(clas):
retn = []
Expand Down Expand Up @@ -858,6 +969,7 @@ async def __anit__(self, path, **kwargs):

self.path = path
self.optspath = s_common.switchext(path, ext='.opts.yaml')
self._optimizer = None

# Make sure we don't have this lmdb DB open already. (This can lead to seg faults)
if path in self.allslabs:
Expand All @@ -883,8 +995,10 @@ async def __anit__(self, path, **kwargs):
self.max_xactops_len = opts.pop('max_replay_log', 10000)
self.recovering = False

opts.setdefault('max_dbs', 128)
opts.setdefault('writemap', True)
self.xactfuncs = [self.put, self.pop, self.delete, self.replace, self._putmulti]
self.xactlooks = {func: indx for (indx, func) in enumerate(self.xactfuncs)}

self.setDefOpts(opts)

self.maxsize = opts.pop('maxsize', None)
self.growsize = opts.pop('growsize', self.DEFAULT_GROWSIZE)
Expand Down Expand Up @@ -955,7 +1069,11 @@ async def memlockfini():
self.commitstats = collections.deque(maxlen=1000) # stores Tuple[time, replayloglen, commit time delta]

if not self.readonly:
await Slab.initSyncLoop(self)
await Slab.initSyncLoop()

def setDefOpts(self, opts):
opts.setdefault('max_dbs', 128)
opts.setdefault('writemap', True)

def __repr__(self):
return 'Slab: %r' % (self.path,)
Expand Down Expand Up @@ -1087,6 +1205,9 @@ def _finiCoXact(self):

self.xact.commit()

if self._optimizer:
self._optimizer.writeahead(self.xactops)

self.xactops.clear()

del self.xact
Expand Down Expand Up @@ -1236,7 +1357,7 @@ def _memorylockloop(self):
self.locking_memory = False
logger.debug('memory locking thread ended')

def initdb(self, name, dupsort=False, integerkey=False, dupfixed=False):
def initdb(self, name, dupsort=False):

if name in self.dbnames:
return name
Expand All @@ -1246,11 +1367,10 @@ def initdb(self, name, dupsort=False, integerkey=False, dupfixed=False):
if self.readonly:
# In a readonly environment, we can't make our own write transaction, but we
# can have the lmdb module create one for us by not specifying the transaction
db = self.lenv.open_db(name.encode('utf8'), create=False, dupsort=dupsort, integerkey=integerkey,
dupfixed=dupfixed)
db = self.lenv.open_db(name.encode('utf8'), create=False, dupsort=dupsort)
else:
db = self.lenv.open_db(name.encode('utf8'), txn=self.xact, dupsort=dupsort, integerkey=integerkey,
dupfixed=dupfixed)
db = self.lenv.open_db(name.encode('utf8'), txn=self.xact, dupsort=dupsort)

self.dirty = True
self.forcecommit()

Expand Down Expand Up @@ -1582,6 +1702,37 @@ def scanByFullBack(self, db=None):

yield from scan.iternext()

def reopen(self):

[scan.bump() for scan in self.scans]

if not self.readonly:
self._finiCoXact()

self.lenv.close()
del self.lenv

opts = s_common.yamlload(self.optspath)
self.setDefOpts(opts)

self.lenv = lmdb.open(self.path, **opts)

# FIXME: any other thing we need to repopulate?
self.mapsize = self.lenv.info()['map_size']

if not self.readonly:
self._initCoXact()

dbnames = list(self.dbnames.items())

for (dbname, (_, dupsort)) in dbnames:

if dbname is None:
continue

self.dbnames.pop(dbname, None)
self.initdb(dbname, dupsort=dupsort)

def _initCoXact(self):
try:
self.xact = self.lenv.begin(write=not self.readonly)
Expand All @@ -1594,6 +1745,9 @@ def _initCoXact(self):
self.dirty = False

def _logXactOper(self, func, *args, **kwargs):

assert self.xactlooks.get(func) is not None

self.xactops.append((func, args, kwargs))

if len(self.xactops) == self.max_xactops_len:
Expand Down Expand Up @@ -1819,16 +1973,21 @@ class Scan:
db (str): name of open database on the slab
'''
def __init__(self, slab, db):
self.db = db
self.slab = slab
self.db, self.dupsort = slab.dbnames[db]
_, self.dupsort = slab.dbnames[db]

self.atitem = None
self.bumped = False
self.curs = None

def getScanDb(self):
db, dupsort = self.slab.dbnames.get(self.db)
return db

def __enter__(self):
self.slab._acqXactForReading()
self.curs = self.slab.xact.cursor(db=self.db)
self.curs = self.slab.xact.cursor(db=self.getScanDb())
self.slab.scans.add(self)
return self

Expand Down Expand Up @@ -1886,7 +2045,7 @@ def iternext(self):

self.bumped = False

self.curs = self.slab.xact.cursor(db=self.db)
self.curs = self.slab.xact.cursor(db=self.getScanDb())

if not self.resume():
raise StopIteration
Expand Down
5 changes: 5 additions & 0 deletions synapse/lib/msgpack.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,8 @@ def getvars(varz):
continue
items.append(item)
return dict(items)

def unpacker(**kwargs):
opts = dict(unpacker_kwargs)
opts.update(kwargs)
return msgpack.Unpacker(**opts)
Loading
Loading