Skip to content

Commit

Permalink
Merge pull request #98 from RNAcentral/dev
Browse files Browse the repository at this point in the history
Do not search for sequences that have already been fetched
  • Loading branch information
carlosribas authored Dec 3, 2019
2 parents e10bb39 + 26e2340 commit 5681a7f
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 108 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ __pycache__/
*.pyo
*.retry

ansible/hosts
ansible/hosts*
ansible/producer_latest.dump
node_modules/
ENV/
Expand All @@ -29,3 +29,4 @@ sequence_search/consumer/results
sequence_search/consumer/queries
sequence_search/consumer/databases
sequence_search/monitor/.conf-file
sequence_search/monitor/test*
11 changes: 11 additions & 0 deletions ansible/postgres_new_release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# Playbook to update the jobs table and allow users to search for a sequence that already has results in the database
- hosts: postgres
remote_user: centos
become: true
become_method: sudo
vars:
ansible_ssh_private_key_file: "../terraform/sequence_search_rsa"
tasks:
- name: Update jobs to allow users to search for any sequence
command: psql -U docker -c "UPDATE jobs SET result_in_db = NOT result_in_db WHERE result_in_db = TRUE" producer
4 changes: 2 additions & 2 deletions sequence_search/consumer/views/submit_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from ..rnacentral_databases import query_file_path, result_file_path, consumer_validator
from ..settings import MAX_RUN_TIME
from ...db import DatabaseConnectionError, SQLError
from ...db.models import CONSUMER_STATUS_CHOICES, JOB_STATUS_CHOICES, JOB_CHUNK_STATUS_CHOICES
from ...db.models import CONSUMER_STATUS_CHOICES, JOB_CHUNK_STATUS_CHOICES
from ...db.job_chunk_results import set_job_chunk_results
from ...db.job_chunks import get_consumer_ip_from_job_chunk, get_job_chunk_from_job_and_database, \
set_job_chunk_status, set_job_chunk_consumer
Expand Down Expand Up @@ -152,7 +152,7 @@ async def submit_job(request):
job_id = data['job_id']
sequence = data['sequence']
database = data['database']
consumer_ip = get_ip(request.app) # 'host.docker.internal'
consumer_ip = get_ip(request.app) # 'host.docker.internal'

# if request was successful, save the consumer state and job_chunk state to the database
try:
Expand Down
8 changes: 8 additions & 0 deletions sequence_search/db/job_chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ async def get_job_chunk_from_job_and_database(engine, job_id, database):


async def save_job_chunk(engine, job_id, database):
"""
Jobs are divided into chunks and each chunk searches for sequences in a piece of the database.
Here we are saving a job chunk for a specific fasta file.
:param engine: params to connect to the db
:param job_id: id of the job
:param database: fasta file with RNAcentral data
:return: id of the job chunk
"""
try:
async with engine.acquire() as connection:
try:
Expand Down
55 changes: 37 additions & 18 deletions sequence_search/db/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,28 @@ def __str__(self):
return "Job '%s' not found" % self.job_id


async def sequence_exists(engine, query):
"""
Check if this query has already been searched
:param engine: params to connect to the db
:param query: the sequence that the user wants to search
:return: job_id if this query is in the db, otherwise returns none
"""
try:
async with engine.acquire() as connection:
try:
sql_query = sa.select([Job.c.id]).select_from(Job).where(
(Job.c.query == query) & Job.c.result_in_db
)
async for row in connection.execute(sql_query):
return row[0] if row else None
except Exception as e:
raise SQLError("Failed to check if query exists for query = %s" % query) from e
except psycopg2.Error as e:
raise DatabaseConnectionError("Failed to open connection to the database in sequence_exists() for "
"sequence with query = %s" % query) from e


async def get_job(engine, job_id):
try:
async with engine.acquire() as connection:
Expand Down Expand Up @@ -81,8 +103,8 @@ async def save_job(engine, query, description):
raise SQLError("Failed to save job for query = %s, "
"description = %s to the database" % (query, description)) from e
except psycopg2.Error as e:
raise DatabaseConnectionError("Failed to open connection to the database in "
"save_job() for job with job_id = %s" % job_id) from e
raise DatabaseConnectionError("Failed to open connection to the database in save_job() for job with "
"job_id = %s" % job_id) from e


async def set_job_status(engine, job_id, status):
Expand All @@ -96,11 +118,12 @@ async def set_job_status(engine, job_id, status):
try:
async with engine.acquire() as connection:
try:
query = sa.text('''UPDATE jobs SET status = :status, finished = :finished WHERE id = :job_id''')
await connection.execute(query, job_id=job_id, status=status, finished=finished)
query = sa.text('''UPDATE jobs SET status = :status, finished = :finished,
result_in_db = :result_in_db WHERE id = :job_id''')
await connection.execute(query, job_id=job_id, status=status, finished=finished, result_in_db=True)
except Exception as e:
raise SQLError("Failed to save job to the database about failed job, "
"job_id = %s, status = %s" % (job_id, status)) from e
raise SQLError("Failed to save job to the database about failed job, job_id = %s, "
"status = %s" % (job_id, status)) from e
except psycopg2.Error as e:
raise DatabaseConnectionError("Failed to open connection to the database in set_job_status() "
"for job with job_id = %s" % job_id) from e
Expand All @@ -125,18 +148,14 @@ async def get_jobs_statuses(engine):
jobs_dict = {}
async for row in connection.execute(query):
if row.job_id not in jobs_dict:
jobs_dict[row.job_id] = {
'id': row.job_id,
'status': row.job_status,
'submitted': str(row.submitted),
'chunks': [
{
'database': row.database,
'status': row.status,
'consumer': row.consumer
}
]
}
jobs_dict[row.job_id] = {
'id': row.job_id,
'status': row.job_status,
'submitted': str(row.submitted),
'chunks': [
{'database': row.database, 'status': row.status, 'consumer': row.consumer}
]
}
else:
jobs_dict[row.job_id]['chunks'].append({
'database': row.database,
Expand Down
78 changes: 40 additions & 38 deletions sequence_search/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,53 +74,54 @@ class CONSUMER_STATUS_CHOICES(object):
# TODO: consistent naming for tables: either 'jobs' and 'consumers' or 'job' and 'consumer'
"""State of a consumer instance"""
Consumer = sa.Table('consumer', metadata,
sa.Column('ip', sa.String(20), primary_key=True),
sa.Column('status', sa.String(255)), # choices=CONSUMER_STATUS_CHOICES, default='available'
sa.Column('job_chunk_id', sa.ForeignKey('job_chunks.id')),
sa.Column('port', sa.String(10)))
sa.Column('ip', sa.String(20), primary_key=True),
sa.Column('status', sa.String(255)), # choices=CONSUMER_STATUS_CHOICES, default='available'
sa.Column('job_chunk_id', sa.ForeignKey('job_chunks.id')),
sa.Column('port', sa.String(10)))

"""A search job that is divided into multiple job chunks per database"""
Job = sa.Table('jobs', metadata,
sa.Column('id', sa.String(36), primary_key=True),
sa.Column('query', sa.Text),
sa.Column('description', sa.Text, nullable=True),
sa.Column('ordering', sa.Text, nullable=True),
sa.Column('submitted', sa.DateTime),
sa.Column('finished', sa.DateTime, nullable=True),
sa.Column('status', sa.String(255))) # choices=JOB_STATUS_CHOICES, default='started'
sa.Column('id', sa.String(36), primary_key=True),
sa.Column('query', sa.Text),
sa.Column('description', sa.Text, nullable=True),
sa.Column('ordering', sa.Text, nullable=True),
sa.Column('submitted', sa.DateTime),
sa.Column('finished', sa.DateTime, nullable=True),
sa.Column('result_in_db', sa.Boolean),
sa.Column('status', sa.String(255))) # choices=JOB_STATUS_CHOICES

"""Part of the search job, run against a specific database and assigned to a specific consumer"""
JobChunk = sa.Table('job_chunks', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('job_id', sa.String(36), sa.ForeignKey('jobs.id')),
sa.Column('database', sa.String(255)),
sa.Column('submitted', sa.DateTime, nullable=True),
sa.Column('finished', sa.DateTime, nullable=True),
sa.Column('consumer', sa.ForeignKey('consumer.ip'), nullable=True),
sa.Column('status', sa.String(255))) # choices=JOB_CHUNK_STATUS_CHOICES, default='started'
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('job_id', sa.String(36), sa.ForeignKey('jobs.id')),
sa.Column('database', sa.String(255)),
sa.Column('submitted', sa.DateTime, nullable=True),
sa.Column('finished', sa.DateTime, nullable=True),
sa.Column('consumer', sa.ForeignKey('consumer.ip'), nullable=True),
sa.Column('status', sa.String(255))) # choices=JOB_CHUNK_STATUS_CHOICES, default='started'

"""Result of a specific JobChunk"""
JobChunkResult = sa.Table('job_chunk_results', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('job_chunk_id', None, sa.ForeignKey('job_chunks.id')),
sa.Column('rnacentral_id', sa.String(255)),
sa.Column('description', sa.Text, nullable=True),
sa.Column('score', sa.Float),
sa.Column('bias', sa.Float),
sa.Column('e_value', sa.Float),
sa.Column('target_length', sa.Integer),
sa.Column('alignment', sa.Text),
sa.Column('alignment_length', sa.Integer),
sa.Column('gap_count', sa.Integer),
sa.Column('match_count', sa.Integer),
sa.Column('nts_count1', sa.Integer),
sa.Column('nts_count2', sa.Integer),
sa.Column('identity', sa.Float),
sa.Column('query_coverage', sa.Float),
sa.Column('target_coverage', sa.Float),
sa.Column('gaps', sa.Float),
sa.Column('query_length', sa.Integer),
sa.Column('result_id', sa.Integer))
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('job_chunk_id', None, sa.ForeignKey('job_chunks.id')),
sa.Column('rnacentral_id', sa.String(255)),
sa.Column('description', sa.Text, nullable=True),
sa.Column('score', sa.Float),
sa.Column('bias', sa.Float),
sa.Column('e_value', sa.Float),
sa.Column('target_length', sa.Integer),
sa.Column('alignment', sa.Text),
sa.Column('alignment_length', sa.Integer),
sa.Column('gap_count', sa.Integer),
sa.Column('match_count', sa.Integer),
sa.Column('nts_count1', sa.Integer),
sa.Column('nts_count2', sa.Integer),
sa.Column('identity', sa.Float),
sa.Column('query_coverage', sa.Float),
sa.Column('target_coverage', sa.Float),
sa.Column('gaps', sa.Float),
sa.Column('query_length', sa.Integer),
sa.Column('result_id', sa.Integer))


# Migrations
Expand Down Expand Up @@ -160,6 +161,7 @@ async def migrate(ENVIRONMENT):
ordering TEXT,
submitted TIMESTAMP,
finished TIMESTAMP,
result_in_db BOOLEAN,
status VARCHAR(255))
''')

Expand Down
20 changes: 12 additions & 8 deletions sequence_search/monitor/monitor.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Script to regularly run a specific search and check the results
# Script to regularly run a search and check the results.
# Each job searches a different sequence with a different size to avoid retrieving database results.

# -o allexport enables all following variable definitions to be exported.
# +o allexport disables this feature.
Expand All @@ -7,7 +8,11 @@ source $PWD/.conf-file
set +o allexport

CONTENT_TYPE="Content-Type: application/json"
DATABASE_AND_QUERY="{\"databases\": [\"mirbase\"], \"query\": \">sequence-search-test\nCUGUACUAUCUACUGUCUCUC\"}"
SIZE=$(( $RANDOM % 5 + 20 ))
NEW_SEQUENCE=$(cat /dev/urandom | env LC_CTYPE=C tr -dc ACGTU | head -c $SIZE)
DATABASES_LIST=("flybase" "gencode" "mirbase" "pdbe" "snopy" "srpdb")
DATABASE=${DATABASES_LIST[RANDOM%${#DATABASES_LIST[@]}]}
DATABASE_AND_QUERY="{\"databases\": [\"$DATABASE\"], \"query\": \">sequence-search-test\n$NEW_SEQUENCE\"}"

# Run search on the correct target (test or default).
# Ansible adds the correct floating_ip to the .conf-file
Expand All @@ -25,11 +30,11 @@ if curl -s --head --request GET $HOST | grep "200 OK" > /dev/null; then
PAUSE="0"
if [ "$STATUS" == "started" ] || [ "$STATUS" == "pending" ]
then
while [ $PAUSE -lt 30 ]
while [ $PAUSE -lt 300 ]
do
sleep 60
sleep 10
STATUS=$(curl -s ${HOST}/api/job-status/$JOB_ID | jq -r '.chunks | .[] | .status')
if [ "$STATUS" == "success" ] || [ "$STATUS" == "error" ] || [ "$STATUS" == "timeout" ]
if [ "$STATUS" == "success" ] || [ "$STATUS" = "partial_success" ] || [ "$STATUS" == "error" ] || [ "$STATUS" == "timeout" ]
then
break
fi
Expand All @@ -38,21 +43,20 @@ if curl -s --head --request GET $HOST | grep "200 OK" > /dev/null; then
fi

# Facets search
# Expected response: "hitCount = 23 and textSearchError = false"
FACETS_SEARCH=$(curl -s ${HOST}/api/facets-search/$JOB_ID | jq '[.hitCount,.textSearchError]')
HIT_COUNT=$(echo ${FACETS_SEARCH} | jq '.[0]')
SEARCH_ERROR=$(echo ${FACETS_SEARCH} | jq '.[1]')

# Send message in case of unexpected result
if [ "$STATUS" != "success" ] || [ "$HIT_COUNT" != "23" ] || [ "$SEARCH_ERROR" != "false" ]
if [ "$STATUS" != "success" ] || [ "$HIT_COUNT" -gt 0 ] && [ "$SEARCH_ERROR" != "false" ] || [ "$HIT_COUNT" == 0 ] && [ "$SEARCH_ERROR" != "true" ]
then
text="Ops! There is something wrong with the RNAcentral sequence search. Please check the links below: \n
\n
To check the status see: ${HOST}/api/job-status/$JOB_ID \n
The job status is ${STATUS} (the expected status is success). \n
\n
To check the results see: ${HOST}/api/facets-search/$JOB_ID \n
The job results are ${HIT_COUNT} and ${SEARCH_ERROR} (the expected results are 23 and false)."
The job results are ${HIT_COUNT} and ${SEARCH_ERROR}."
escapedText=$(echo ${text} | sed 's/"/\"/g' | sed "s/'/\'/g" )
json="{\"text\": \"$escapedText\"}"
curl -s -d "payload=$json" $WEBHOOK_URL
Expand Down
Loading

0 comments on commit 5681a7f

Please sign in to comment.