-
Notifications
You must be signed in to change notification settings - Fork 193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use/remove SqlaGroup.add_nodes/remove_nodes
s skip_orm
flag?
#5453
Comments
We added this back in the day because the performance of adding to groups is really bad. The raw SQL will often take order(s) of magnitude less compared to the call through our ORM. With the |
Yep, still a problem, In [1]: group = Group('test')
In [2]: group.store()
Out[2]: <Group: 'test' [type core], of user mail@sphuber.net>
In [3]: nodes = [Data().store() for _ in range(1000)]
In [4]: backend_nodes = [n.backend_entity for n in nodes]
In [5]: %timeit group.add_nodes(nodes)
1.94 s ± 175 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [6]: group.clear()
In [7]: %timeit group.backend_entity.add_nodes(backend_nodes)
2.01 s ± 49.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [8]: group.clear()
In [9]: %timeit group.backend_entity.add_nodes(backend_nodes, skip_orm=True)
37.9 ms ± 1.48 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) |
Ok cheers, I think there may be a better way to do this though, in order to expose on the front end. Will look into it |
We'll be having a talented Master's student, who'll be looking into improving bulk SQL operations, such as what is outlined here. Have there, by any chance, been any developments on this topic in recent years? |
Not really, I think the state of this conversation is still pretty accurate |
TLDR: Appending the nodes one by one in I analyzed the implementation of To confirm these hypotheses, I modified the function slightly and performed a benchmark. Two optional parameters are added to
The modified method can be found here for reference: def add_nodes(self, nodes, **kwargs):
"""Add a node or a set of nodes to the group.
:note: all the nodes *and* the group itself have to be stored.
:param nodes: a list of `BackendNode` instance to be added to this group
:param kwargs:
skip_orm: When the flag is on, the SQLA ORM is skipped and SQLA is used
to create a direct SQL INSERT statement to the group-node relationship
table (to improve speed).
"""
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.exc import IntegrityError
super().add_nodes(nodes)
skip_orm = kwargs.get('skip_orm', False)
skip_flush = kwargs.get('skip_flush', False)
use_extend=kwargs.get('use_extend', False)
def check_node(given_node):
"""Check if given node is of correct type and stored"""
if not isinstance(given_node, self.NODE_CLASS):
raise TypeError(f'invalid type {type(given_node)}, has to be {self.NODE_CLASS}')
if not given_node.is_stored:
raise ValueError('At least one of the provided nodes is unstored, stopping...')
with utils.disable_expire_on_commit(self.backend.get_session()) as session:
if not skip_orm and not use_extend:
# Get dbnodes here ONCE, otherwise each call to dbnodes will re-read the current value in the database
dbnodes = self.model.dbnodes
for node in nodes:
check_node(node)
# Use pattern as suggested here:
# http://docs.sqlalchemy.org/en/latest/orm/session_transaction.html#using-savepoint
try:
with session.begin_nested():
dbnodes.append(node.bare_model)
# this branch will find out how much overhead frequent flushes make
if not skip_flush:
session.flush()
except IntegrityError:
# Duplicate entry, skip
pass
# this branch will check if using extend instead of append will be more efficient
elif not skip_orm:
dbnodes = self.model.dbnodes
for node in nodes:
check_node(node)
try:
with session.begin_nested():
dbnodes.extend([node.bare_model for node in nodes])
except IntegrityError:
pass
else:
ins_dict = []
for node in nodes:
check_node(node)
ins_dict.append({'dbnode_id': node.id, 'dbgroup_id': self.id})
table = self.GROUP_NODE_CLASS.__table__
ins = insert(table).values(ins_dict)
session.execute(ins.on_conflict_do_nothing(index_elements=['dbnode_id', 'dbgroup_id']))
# Commit everything as up till now we've just flushed
if not session.in_nested_transaction():
session.commit() And the benchmark script is shown here: @pytest.mark.benchmark(group='Group')
@pytest.mark.usefixtures('aiida_profile_clean')
@pytest.mark.parametrize('skip_orm', [True, False], ids=['skip_orm', 'with_orm'])
@pytest.mark.parametrize('skip_flush', [True, False], ids=['skip_flush', 'with_flush'])
@pytest.mark.parametrize('use_extend', [True, False], ids=['extend', 'append'])
def test_group_add_nodes(benchmark, skip_orm, skip_flush, use_extend):
num_nodes = 100
nodes = []
for _ in range(num_nodes):
node = orm.Data().store()
nodes.append(node.backend_entity)
def _setup():
id = uuid.uuid4()
group = orm.Group(label=id.hex).store().backend_entity
return (group, nodes), {}
def _run(group, nodes):
group.add_nodes(nodes, skip_orm=skip_orm, skip_flush=skip_flush, use_extend=use_extend)
benchmark.pedantic(_run, setup=_setup, rounds=10) The result of the benchmark is:
The benchmarks confirm that the main bottleneck is appending nodes one by one in the ORM implementation, causing excessive database interactions. Frequent |
I understand the reason why append and flush is used is because it want to mimic the behavior of |
The
skip_orm
flag was added to these methods in b40b2ca and bced84eHowever, it is not currently used anywhere in the code base:
add_nodes
, it was effectively superseded byStorageBackend.bulk_insert
, for importing archivesremove_nodes
, I'm not sure if it was ever used (@sphuber?)A place one might want to use it, is in
aiida/cmdline/commands/cmd_group.py::group_remove_nodes
.I would note, though, that performance of bulk insertions/deletion on the session has been improved in
sqlalchemy>=1.4
. So it might be worth double-checking any perceived performance improvementsThe text was updated successfully, but these errors were encountered: