Skip to content

Commit

Permalink
Merge pull request #29 from 4dn-dcic/get_es_speedup
Browse files Browse the repository at this point in the history
Add filters support to get_es_metadata
  • Loading branch information
Carl Vitzthum authored Oct 26, 2018
2 parents 7fe1c78 + b3ee2b4 commit ade2cb8
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 8 deletions.
2 changes: 1 addition & 1 deletion dcicutils/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Version information."""

# The following line *must* be the last in the module, exactly as formatted:
__version__ = "0.5.1"
__version__ = "0.5.2"
61 changes: 54 additions & 7 deletions dcicutils/ff_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ def delete_field(obj_id, del_field, key=None, ff_env=None):
return get_response_json(response)


def get_es_search_generator(es_client, index, body, page_size=50):
def get_es_search_generator(es_client, index, body, page_size=200):
"""
Simple generator behind get_es_metada which takes an es_client (from
es_utils create_es_client), a string index, and a dict query body.
Expand All @@ -348,13 +348,28 @@ def get_es_search_generator(es_client, index, body, page_size=50):
yield es_hits


def get_es_metadata(uuids, es_client=None, key=None, ff_env=None):
def get_es_metadata(uuids, es_client=None, filters={}, chunk_size=200,
key=None, ff_env=None):
"""
Given a list of string item uuids, will return a
dictionary response of the full ES record for those items (or an empty
dictionary if the items don't exist/ are not indexed)
You can pass in an Elasticsearch client (initialized by create_es_client)
through the es_client param to save init time.
Advanced users can optionally pass a dict of filters that will be added
to the Elasticsearch query.
For example: filters={'status': 'released'}
You can also specify NOT fields:
example: filters={'status': '!released'}
You can also specifiy lists of values for fields:
example: filters={'status': ['released', archived']}
NOTES:
- different filter field are combined using AND queries (must all match)
example: filters={'status': ['released'], 'public_release': ['2018-01-01']}
- values for the same field and combined with OR (such as multiple statuses)
Integer chunk_size may be used to control the number of uuids that are
passed to Elasticsearch in each query; setting this too high may cause
ES reads to timeout.
Same auth mechanism as the other metadata functions
"""
if es_client is None:
Expand All @@ -364,11 +379,43 @@ def get_es_metadata(uuids, es_client=None, key=None, ff_env=None):
# sending in too many uuids in the terms query can crash es; break them up
# into groups of max size 100
es_res = []
for i in range(0, len(uuids), 100):
query_uuids = uuids[i:i + 100]
es_query = {'query': {'terms': {'_id': query_uuids}},
'sort': [{'_uid': {'order': 'desc'}}]}
for es_page in get_es_search_generator(es_client, '_all', es_query):
for i in range(0, len(uuids), chunk_size):
query_uuids = uuids[i:i + chunk_size]
es_query = {
'query': {
'bool': {
'must': [
{'terms': {'_id': query_uuids}}
],
'must_not': []
}
},
'sort': [{'_uid': {'order': 'desc'}}]
}
if filters:
if not isinstance(filters, dict):
print('Invalid filter for get_es_metadata: %s' % filters)
else:
for k, v in filters.items():
key_terms = []
key_not_terms = []
iter_terms = [v] if not isinstance(v, list) else v
for val in iter_terms:
if val.startswith('!'):
key_not_terms.append(val[1:])
else:
key_terms.append(val)
if key_terms:
es_query['query']['bool']['must'].append(
{'terms': {'embedded.' + k + '.raw': key_terms}}
)
if key_not_terms:
es_query['query']['bool']['must_not'].append(
{'terms': {'embedded.' + k + '.raw': key_not_terms}}
)
# use chunk_limit as page size for performance reasons
for es_page in get_es_search_generator(es_client, '_all', es_query,
page_size=chunk_size):
# return the document source only; eliminate es metadata
es_res.extend([hit['_source'] for hit in es_page])
return es_res
Expand Down
24 changes: 24 additions & 0 deletions test/test_ff_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dcicutils import ff_utils
import pytest
import json
pytestmark = pytest.mark.working


Expand Down Expand Up @@ -218,6 +219,11 @@ def test_get_metadata(integrated_ff, basestring):

# testing check_queues functionality requires patching
ff_utils.patch_metadata({'description': 'test description'}, obj_id=test_item, key=integrated_ff['ff_key'])
# add a bunch more stuff to the queue
idx_body = json.dumps({'uuids': [test_item], 'target_queue': 'secondary'})
for i in range(10):
ff_utils.authorized_request(integrated_ff['ff_key']['server'] + '/queue_indexing',
auth=integrated_ff['ff_key'], verb='POST', data=idx_body)
res_w_check = ff_utils.get_metadata(test_item, key=integrated_ff['ff_key'],
ff_env=integrated_ff['ff_env'], check_queue=True)
res_db = ff_utils.get_metadata(test_item, key=integrated_ff['ff_key'],
Expand Down Expand Up @@ -382,6 +388,24 @@ def test_get_es_metadata(integrated_ff):
all_es_uuids = [item['uuid'] for item in all_es]
assert set(all_es_uuids) == set(all_uuids)

# make sure filters work with the search
bios_in_rev = ff_utils.search_metadata('/search/?type=Biosample&frame=object&status=in+review+by+lab',
key=integrated_ff['ff_key'])
bios_replaced = ff_utils.search_metadata('/search/?type=Biosample&frame=object&status=replaced',
key=integrated_ff['ff_key'])
bios_uuids = [item['uuid'] for item in bios_in_rev + bios_replaced]
all_uuids.extend(bios_uuids) # add the replaced biosample uuids
filters = {'status': ['in review by lab', 'replaced'], '@type': ['Biosample']}
bios_es = ff_utils.get_es_metadata(all_uuids, filters=filters, key=integrated_ff['ff_key'])
assert set([item['uuid'] for item in bios_es]) == set(bios_uuids)

bios_neg_search = ('/search/?type=Biosample&frame=object&status=in+review+by+lab'
'&modifications.modification_type!=Other')
bios_neg_res = ff_utils.search_metadata(bios_neg_search, key=integrated_ff['ff_key'])
filters2 = {'status': ['in review by lab'], 'modifications.modification_type': ['!Other'], '@type': ['Biosample']}
bios_neg_es = ff_utils.get_es_metadata(all_uuids, filters=filters2, key=integrated_ff['ff_key'])
assert set([item['uuid'] for item in bios_neg_es]) == set(item['uuid'] for item in bios_neg_res)


@pytest.mark.integrated
def test_get_es_search_generator(integrated_ff):
Expand Down

0 comments on commit ade2cb8

Please sign in to comment.