diff --git a/.gitignore b/.gitignore index c15bda1c..c57bd019 100644 --- a/.gitignore +++ b/.gitignore @@ -20,7 +20,7 @@ __pycache__/ *.pyo *.retry -ansible/hosts +ansible/hosts* ansible/producer_latest.dump node_modules/ ENV/ @@ -29,3 +29,4 @@ sequence_search/consumer/results sequence_search/consumer/queries sequence_search/consumer/databases sequence_search/monitor/.conf-file +sequence_search/monitor/test* diff --git a/ansible/postgres_new_release.yml b/ansible/postgres_new_release.yml new file mode 100644 index 00000000..80eedd42 --- /dev/null +++ b/ansible/postgres_new_release.yml @@ -0,0 +1,11 @@ +--- +# Playbook to update the jobs table and allow users to search for a sequence that already has results in the database +- hosts: postgres + remote_user: centos + become: true + become_method: sudo + vars: + ansible_ssh_private_key_file: "../terraform/sequence_search_rsa" + tasks: + - name: Update jobs to allow users to search for any sequence + command: psql -U docker -c "UPDATE jobs SET result_in_db = NOT result_in_db WHERE result_in_db = TRUE" producer diff --git a/sequence_search/consumer/views/submit_job.py b/sequence_search/consumer/views/submit_job.py index 5b32bd35..86f21ee8 100644 --- a/sequence_search/consumer/views/submit_job.py +++ b/sequence_search/consumer/views/submit_job.py @@ -24,7 +24,7 @@ from ..rnacentral_databases import query_file_path, result_file_path, consumer_validator from ..settings import MAX_RUN_TIME from ...db import DatabaseConnectionError, SQLError -from ...db.models import CONSUMER_STATUS_CHOICES, JOB_STATUS_CHOICES, JOB_CHUNK_STATUS_CHOICES +from ...db.models import CONSUMER_STATUS_CHOICES, JOB_CHUNK_STATUS_CHOICES from ...db.job_chunk_results import set_job_chunk_results from ...db.job_chunks import get_consumer_ip_from_job_chunk, get_job_chunk_from_job_and_database, \ set_job_chunk_status, set_job_chunk_consumer @@ -152,7 +152,7 @@ async def submit_job(request): job_id = data['job_id'] sequence = data['sequence'] database = data['database'] - consumer_ip = get_ip(request.app) # 'host.docker.internal' + consumer_ip = get_ip(request.app) # 'host.docker.internal' # if request was successful, save the consumer state and job_chunk state to the database try: diff --git a/sequence_search/db/job_chunks.py b/sequence_search/db/job_chunks.py index 30021440..6d97a355 100644 --- a/sequence_search/db/job_chunks.py +++ b/sequence_search/db/job_chunks.py @@ -60,6 +60,14 @@ async def get_job_chunk_from_job_and_database(engine, job_id, database): async def save_job_chunk(engine, job_id, database): + """ + Jobs are divided into chunks and each chunk searches for sequences in a piece of the database. + Here we are saving a job chunk for a specific fasta file. + :param engine: params to connect to the db + :param job_id: id of the job + :param database: fasta file with RNAcentral data + :return: id of the job chunk + """ try: async with engine.acquire() as connection: try: diff --git a/sequence_search/db/jobs.py b/sequence_search/db/jobs.py index bcdaa92c..1c3e0ccf 100644 --- a/sequence_search/db/jobs.py +++ b/sequence_search/db/jobs.py @@ -29,6 +29,28 @@ def __str__(self): return "Job '%s' not found" % self.job_id +async def sequence_exists(engine, query): + """ + Check if this query has already been searched + :param engine: params to connect to the db + :param query: the sequence that the user wants to search + :return: job_id if this query is in the db, otherwise returns none + """ + try: + async with engine.acquire() as connection: + try: + sql_query = sa.select([Job.c.id]).select_from(Job).where( + (Job.c.query == query) & Job.c.result_in_db + ) + async for row in connection.execute(sql_query): + return row[0] if row else None + except Exception as e: + raise SQLError("Failed to check if query exists for query = %s" % query) from e + except psycopg2.Error as e: + raise DatabaseConnectionError("Failed to open connection to the database in sequence_exists() for " + "sequence with query = %s" % query) from e + + async def get_job(engine, job_id): try: async with engine.acquire() as connection: @@ -81,8 +103,8 @@ async def save_job(engine, query, description): raise SQLError("Failed to save job for query = %s, " "description = %s to the database" % (query, description)) from e except psycopg2.Error as e: - raise DatabaseConnectionError("Failed to open connection to the database in " - "save_job() for job with job_id = %s" % job_id) from e + raise DatabaseConnectionError("Failed to open connection to the database in save_job() for job with " + "job_id = %s" % job_id) from e async def set_job_status(engine, job_id, status): @@ -96,11 +118,12 @@ async def set_job_status(engine, job_id, status): try: async with engine.acquire() as connection: try: - query = sa.text('''UPDATE jobs SET status = :status, finished = :finished WHERE id = :job_id''') - await connection.execute(query, job_id=job_id, status=status, finished=finished) + query = sa.text('''UPDATE jobs SET status = :status, finished = :finished, + result_in_db = :result_in_db WHERE id = :job_id''') + await connection.execute(query, job_id=job_id, status=status, finished=finished, result_in_db=True) except Exception as e: - raise SQLError("Failed to save job to the database about failed job, " - "job_id = %s, status = %s" % (job_id, status)) from e + raise SQLError("Failed to save job to the database about failed job, job_id = %s, " + "status = %s" % (job_id, status)) from e except psycopg2.Error as e: raise DatabaseConnectionError("Failed to open connection to the database in set_job_status() " "for job with job_id = %s" % job_id) from e @@ -125,18 +148,14 @@ async def get_jobs_statuses(engine): jobs_dict = {} async for row in connection.execute(query): if row.job_id not in jobs_dict: - jobs_dict[row.job_id] = { - 'id': row.job_id, - 'status': row.job_status, - 'submitted': str(row.submitted), - 'chunks': [ - { - 'database': row.database, - 'status': row.status, - 'consumer': row.consumer - } - ] - } + jobs_dict[row.job_id] = { + 'id': row.job_id, + 'status': row.job_status, + 'submitted': str(row.submitted), + 'chunks': [ + {'database': row.database, 'status': row.status, 'consumer': row.consumer} + ] + } else: jobs_dict[row.job_id]['chunks'].append({ 'database': row.database, diff --git a/sequence_search/db/models.py b/sequence_search/db/models.py index b43ece0a..d68cecf4 100644 --- a/sequence_search/db/models.py +++ b/sequence_search/db/models.py @@ -74,53 +74,54 @@ class CONSUMER_STATUS_CHOICES(object): # TODO: consistent naming for tables: either 'jobs' and 'consumers' or 'job' and 'consumer' """State of a consumer instance""" Consumer = sa.Table('consumer', metadata, - sa.Column('ip', sa.String(20), primary_key=True), - sa.Column('status', sa.String(255)), # choices=CONSUMER_STATUS_CHOICES, default='available' - sa.Column('job_chunk_id', sa.ForeignKey('job_chunks.id')), - sa.Column('port', sa.String(10))) + sa.Column('ip', sa.String(20), primary_key=True), + sa.Column('status', sa.String(255)), # choices=CONSUMER_STATUS_CHOICES, default='available' + sa.Column('job_chunk_id', sa.ForeignKey('job_chunks.id')), + sa.Column('port', sa.String(10))) """A search job that is divided into multiple job chunks per database""" Job = sa.Table('jobs', metadata, - sa.Column('id', sa.String(36), primary_key=True), - sa.Column('query', sa.Text), - sa.Column('description', sa.Text, nullable=True), - sa.Column('ordering', sa.Text, nullable=True), - sa.Column('submitted', sa.DateTime), - sa.Column('finished', sa.DateTime, nullable=True), - sa.Column('status', sa.String(255))) # choices=JOB_STATUS_CHOICES, default='started' + sa.Column('id', sa.String(36), primary_key=True), + sa.Column('query', sa.Text), + sa.Column('description', sa.Text, nullable=True), + sa.Column('ordering', sa.Text, nullable=True), + sa.Column('submitted', sa.DateTime), + sa.Column('finished', sa.DateTime, nullable=True), + sa.Column('result_in_db', sa.Boolean), + sa.Column('status', sa.String(255))) # choices=JOB_STATUS_CHOICES """Part of the search job, run against a specific database and assigned to a specific consumer""" JobChunk = sa.Table('job_chunks', metadata, - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('job_id', sa.String(36), sa.ForeignKey('jobs.id')), - sa.Column('database', sa.String(255)), - sa.Column('submitted', sa.DateTime, nullable=True), - sa.Column('finished', sa.DateTime, nullable=True), - sa.Column('consumer', sa.ForeignKey('consumer.ip'), nullable=True), - sa.Column('status', sa.String(255))) # choices=JOB_CHUNK_STATUS_CHOICES, default='started' + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('job_id', sa.String(36), sa.ForeignKey('jobs.id')), + sa.Column('database', sa.String(255)), + sa.Column('submitted', sa.DateTime, nullable=True), + sa.Column('finished', sa.DateTime, nullable=True), + sa.Column('consumer', sa.ForeignKey('consumer.ip'), nullable=True), + sa.Column('status', sa.String(255))) # choices=JOB_CHUNK_STATUS_CHOICES, default='started' """Result of a specific JobChunk""" JobChunkResult = sa.Table('job_chunk_results', metadata, - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('job_chunk_id', None, sa.ForeignKey('job_chunks.id')), - sa.Column('rnacentral_id', sa.String(255)), - sa.Column('description', sa.Text, nullable=True), - sa.Column('score', sa.Float), - sa.Column('bias', sa.Float), - sa.Column('e_value', sa.Float), - sa.Column('target_length', sa.Integer), - sa.Column('alignment', sa.Text), - sa.Column('alignment_length', sa.Integer), - sa.Column('gap_count', sa.Integer), - sa.Column('match_count', sa.Integer), - sa.Column('nts_count1', sa.Integer), - sa.Column('nts_count2', sa.Integer), - sa.Column('identity', sa.Float), - sa.Column('query_coverage', sa.Float), - sa.Column('target_coverage', sa.Float), - sa.Column('gaps', sa.Float), - sa.Column('query_length', sa.Integer), - sa.Column('result_id', sa.Integer)) + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('job_chunk_id', None, sa.ForeignKey('job_chunks.id')), + sa.Column('rnacentral_id', sa.String(255)), + sa.Column('description', sa.Text, nullable=True), + sa.Column('score', sa.Float), + sa.Column('bias', sa.Float), + sa.Column('e_value', sa.Float), + sa.Column('target_length', sa.Integer), + sa.Column('alignment', sa.Text), + sa.Column('alignment_length', sa.Integer), + sa.Column('gap_count', sa.Integer), + sa.Column('match_count', sa.Integer), + sa.Column('nts_count1', sa.Integer), + sa.Column('nts_count2', sa.Integer), + sa.Column('identity', sa.Float), + sa.Column('query_coverage', sa.Float), + sa.Column('target_coverage', sa.Float), + sa.Column('gaps', sa.Float), + sa.Column('query_length', sa.Integer), + sa.Column('result_id', sa.Integer)) # Migrations @@ -160,6 +161,7 @@ async def migrate(ENVIRONMENT): ordering TEXT, submitted TIMESTAMP, finished TIMESTAMP, + result_in_db BOOLEAN, status VARCHAR(255)) ''') diff --git a/sequence_search/monitor/monitor.sh b/sequence_search/monitor/monitor.sh index d2c97948..f8681fa7 100755 --- a/sequence_search/monitor/monitor.sh +++ b/sequence_search/monitor/monitor.sh @@ -1,4 +1,5 @@ -# Script to regularly run a specific search and check the results +# Script to regularly run a search and check the results. +# Each job searches a different sequence with a different size to avoid retrieving database results. # -o allexport enables all following variable definitions to be exported. # +o allexport disables this feature. @@ -7,7 +8,11 @@ source $PWD/.conf-file set +o allexport CONTENT_TYPE="Content-Type: application/json" -DATABASE_AND_QUERY="{\"databases\": [\"mirbase\"], \"query\": \">sequence-search-test\nCUGUACUAUCUACUGUCUCUC\"}" +SIZE=$(( $RANDOM % 5 + 20 )) +NEW_SEQUENCE=$(cat /dev/urandom | env LC_CTYPE=C tr -dc ACGTU | head -c $SIZE) +DATABASES_LIST=("flybase" "gencode" "mirbase" "pdbe" "snopy" "srpdb") +DATABASE=${DATABASES_LIST[RANDOM%${#DATABASES_LIST[@]}]} +DATABASE_AND_QUERY="{\"databases\": [\"$DATABASE\"], \"query\": \">sequence-search-test\n$NEW_SEQUENCE\"}" # Run search on the correct target (test or default). # Ansible adds the correct floating_ip to the .conf-file @@ -25,11 +30,11 @@ if curl -s --head --request GET $HOST | grep "200 OK" > /dev/null; then PAUSE="0" if [ "$STATUS" == "started" ] || [ "$STATUS" == "pending" ] then - while [ $PAUSE -lt 30 ] + while [ $PAUSE -lt 300 ] do - sleep 60 + sleep 10 STATUS=$(curl -s ${HOST}/api/job-status/$JOB_ID | jq -r '.chunks | .[] | .status') - if [ "$STATUS" == "success" ] || [ "$STATUS" == "error" ] || [ "$STATUS" == "timeout" ] + if [ "$STATUS" == "success" ] || [ "$STATUS" = "partial_success" ] || [ "$STATUS" == "error" ] || [ "$STATUS" == "timeout" ] then break fi @@ -38,13 +43,12 @@ if curl -s --head --request GET $HOST | grep "200 OK" > /dev/null; then fi # Facets search - # Expected response: "hitCount = 23 and textSearchError = false" FACETS_SEARCH=$(curl -s ${HOST}/api/facets-search/$JOB_ID | jq '[.hitCount,.textSearchError]') HIT_COUNT=$(echo ${FACETS_SEARCH} | jq '.[0]') SEARCH_ERROR=$(echo ${FACETS_SEARCH} | jq '.[1]') # Send message in case of unexpected result - if [ "$STATUS" != "success" ] || [ "$HIT_COUNT" != "23" ] || [ "$SEARCH_ERROR" != "false" ] + if [ "$STATUS" != "success" ] || [ "$HIT_COUNT" -gt 0 ] && [ "$SEARCH_ERROR" != "false" ] || [ "$HIT_COUNT" == 0 ] && [ "$SEARCH_ERROR" != "true" ] then text="Ops! There is something wrong with the RNAcentral sequence search. Please check the links below: \n \n @@ -52,7 +56,7 @@ if curl -s --head --request GET $HOST | grep "200 OK" > /dev/null; then The job status is ${STATUS} (the expected status is success). \n \n To check the results see: ${HOST}/api/facets-search/$JOB_ID \n - The job results are ${HIT_COUNT} and ${SEARCH_ERROR} (the expected results are 23 and false)." + The job results are ${HIT_COUNT} and ${SEARCH_ERROR}." escapedText=$(echo ${text} | sed 's/"/\"/g' | sed "s/'/\'/g" ) json="{\"text\": \"$escapedText\"}" curl -s -d "payload=$json" $WEBHOOK_URL diff --git a/sequence_search/producer/views/submit_job.py b/sequence_search/producer/views/submit_job.py index 7e3028cf..8a009bbc 100644 --- a/sequence_search/producer/views/submit_job.py +++ b/sequence_search/producer/views/submit_job.py @@ -18,7 +18,7 @@ from sequence_search.db.models import JOB_CHUNK_STATUS_CHOICES from ...db.consumers import delegate_job_chunk_to_consumer, find_available_consumers -from ...db.jobs import save_job +from ...db.jobs import save_job, sequence_exists from ...db.job_chunks import save_job_chunk, set_job_chunk_status from ...consumer.rnacentral_databases import producer_validator, producer_to_consumers_databases @@ -105,43 +105,49 @@ async def submit_job(request): except (KeyError, TypeError, ValueError) as e: raise web.HTTPBadRequest(text=str(e)) from e - # save metadata about this job and job_chunks to the database - job_id = await save_job(request.app['engine'], data['query'], data['description']) - - databases = producer_to_consumers_databases(data['databases']) - for database in databases: - # save job_chunk with "created" status. This prevents the check_chunks_and_consumers function, - # which runs every 5 seconds, from executing the same job_chunk again. - await save_job_chunk(request.app['engine'], job_id, database) - - # TODO: what if Job was saved and JobChunk was not? Need transactions? - - consumers = await find_available_consumers(request.app['engine']) - - # if there are consumers available, delegate job_chunk to consumer - for index in range(min(len(consumers), len(databases))): - try: - await delegate_job_chunk_to_consumer( - engine=request.app['engine'], - consumer_ip=consumers[index].ip, - consumer_port=consumers[index].port, - job_id=job_id, - database=databases[index], - query=data['query'] - ) - except Exception as e: - return web.HTTPBadGateway(text=str(e)) - - # change job_chunk status to pending if no consumer is available - for index in range(len(consumers), len(databases)): - try: - await set_job_chunk_status( - request.app['engine'], - job_id, - databases[index], - status=JOB_CHUNK_STATUS_CHOICES.pending - ) - except Exception as e: - return web.HTTPBadGateway(text=str(e)) - - return web.json_response({"job_id": job_id}, status=201) + # perform the search or get the data from the database? + job_id = await sequence_exists(request.app['engine'], data['query']) + + if job_id: + return web.json_response({"job_id": job_id}, status=201) + else: + # save metadata about this job and job_chunks to the database + job_id = await save_job(request.app['engine'], data['query'], data['description']) + + databases = producer_to_consumers_databases(data['databases']) + for database in databases: + # save job_chunk with "created" status. This prevents the check_chunks_and_consumers function, + # which runs every 5 seconds, from executing the same job_chunk again. + await save_job_chunk(request.app['engine'], job_id, database) + + # TODO: what if Job was saved and JobChunk was not? Need transactions? + + consumers = await find_available_consumers(request.app['engine']) + + # if there are consumers available, delegate job_chunk to consumer + for index in range(min(len(consumers), len(databases))): + try: + await delegate_job_chunk_to_consumer( + engine=request.app['engine'], + consumer_ip=consumers[index].ip, + consumer_port=consumers[index].port, + job_id=job_id, + database=databases[index], + query=data['query'] + ) + except Exception as e: + return web.HTTPBadGateway(text=str(e)) + + # change job_chunk status to pending if no consumer is available + for index in range(len(consumers), len(databases)): + try: + await set_job_chunk_status( + request.app['engine'], + job_id, + databases[index], + status=JOB_CHUNK_STATUS_CHOICES.pending + ) + except Exception as e: + return web.HTTPBadGateway(text=str(e)) + + return web.json_response({"job_id": job_id}, status=201)