Skip to content

Commit

Permalink
chore: merge state.py and finish_callback() into services.py, #56
Browse files Browse the repository at this point in the history
  • Loading branch information
jpmckinney committed Jan 18, 2023
1 parent 7bbe146 commit 1a08cba
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 138 deletions.
3 changes: 1 addition & 2 deletions contracting_process/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
from tools import settings
from tools.getter import get_values
from tools.helpers import is_step_required
from tools.services import get_cursor
from tools.state import set_items_state, state
from tools.services import get_cursor, set_items_state, state

logger = logging.getLogger("pelican.contracting_process.processor")

Expand Down
3 changes: 1 addition & 2 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import click

from tools import exchange_rates_db, settings
from tools.services import commit, get_cursor, publish
from tools.state import phase, set_dataset_state, state
from tools.services import commit, get_cursor, phase, publish, set_dataset_state, state


@click.group()
Expand Down
21 changes: 1 addition & 20 deletions tools/helpers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import random
from typing import Any, List, Optional

from yapw.methods.blocking import ack, publish
from typing import Any, List

from tools import settings
from tools.services import commit
from tools.state import set_dataset_state, state


class ReservoirSampler:
Expand Down Expand Up @@ -33,18 +29,3 @@ def retrieve_samples(self) -> List[Any]:

def is_step_required(*steps: str) -> bool:
return any(step in settings.STEPS for step in steps)


# Has affinity with services.py, but would result in circular dependency due to `set_dataset_state()`.
def finish_callback(
client_state, channel, method, dataset_id: int, phase: Optional[str] = None, routing_key: Optional[str] = None
) -> None:
"""
Update the dataset's state, publish a message if a routing key is provided, and ack the message.
"""
if phase:
set_dataset_state(dataset_id, state.OK, phase)
commit()
if routing_key:
publish(client_state, channel, {"dataset_id": dataset_id}, routing_key)
ack(client_state, channel, method.delivery_tag)
124 changes: 123 additions & 1 deletion tools/services.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
from typing import Any, Optional
from typing import Any, List, Optional, Tuple

import pika.exceptions
import psycopg2.extensions
import psycopg2.extras
import simplejson as json
from yapw import clients
from yapw.methods import blocking

from tools import settings

Expand All @@ -15,6 +16,9 @@
logger = logging.getLogger(__name__)


# RabbitMQ


class Consumer(clients.Threaded, clients.Durable, clients.Blocking, clients.Base):
pass

Expand Down Expand Up @@ -60,6 +64,26 @@ def publish(*args, **kwargs):
client.close()


# RabbitMQ + PostgreSQL


def finish_callback(
client_state, channel, method, dataset_id: int, phase: Optional[str] = None, routing_key: Optional[str] = None
) -> None:
"""
Update the dataset's state, publish a message if a routing key is provided, and ack the message.
"""
if phase:
set_dataset_state(dataset_id, state.OK, phase)
commit()
if routing_key:
blocking.publish(client_state, channel, {"dataset_id": dataset_id}, routing_key)
blocking.ack(client_state, channel, method.delivery_tag)


# PostgreSQL


def get_cursor() -> psycopg2.extensions.cursor:
global db_connected, db_connection
if not db_connected:
Expand All @@ -75,3 +99,101 @@ def commit() -> None:

def rollback() -> None:
db_connection.rollback()


class state:
IN_PROGRESS = "IN_PROGRESS"
OK = "OK"


class phase:
CONTRACTING_PROCESS = "CONTRACTING_PROCESS"
DATASET = "DATASET"
TIME_VARIANCE = "TIME_VARIANCE"
CHECKED = "CHECKED"
DELETED = "DELETED"


def set_dataset_state(dataset_id: int, state: str, phase: str, size: Optional[int] = None) -> None:
"""
Upsert a dataset's progress to the given state and phase.
:param dataset_id: the dataset's ID
:param state: the state to set
:param phase: the phase to be set
:param size: number of data items to process
"""
if size:
sql = """\
INSERT INTO progress_monitor_dataset (dataset_id, state, phase, size)
VALUES (%(dataset_id)s, %(state)s, %(phase)s, %(size)s)
ON CONFLICT (dataset_id)
DO UPDATE SET state = %(state)s, phase = %(phase)s, size = %(size)s, modified = now()
"""
else:
sql = """\
INSERT INTO progress_monitor_dataset (dataset_id, state, phase, size)
VALUES (%(dataset_id)s, %(state)s, %(phase)s, %(size)s)
ON CONFLICT (dataset_id)
DO UPDATE SET state = %(state)s, phase = %(phase)s, modified = now()
"""
with get_cursor() as cursor:
cursor.execute(sql, {"dataset_id": dataset_id, "state": state, "phase": phase, "size": size})


def set_items_state(dataset_id: int, item_ids: List[int], state: str) -> None:
"""
Upsert data items' progress to the given state.
:param dataset_id: the dataset's ID
:param item_ids: the data items' IDs
:param state: the state to set
"""
with get_cursor() as cursor:
sql = """\
INSERT INTO progress_monitor_item (dataset_id, item_id, state)
VALUES %s
ON CONFLICT (dataset_id, item_id)
DO UPDATE SET state = excluded.state, modified = now()
"""
psycopg2.extras.execute_values(cursor, sql, [(dataset_id, item_id, state) for item_id in item_ids])


def get_processed_items_count(dataset_id: int) -> int:
"""
Return the number of items processed.
:param dataset_id: the dataset's ID
"""
with get_cursor() as cursor:
cursor.execute(
"SELECT COUNT(*) cnt FROM progress_monitor_item WHERE dataset_id = %(dataset_id)s AND state = %(state)s",
{"dataset_id": dataset_id, "state": state.OK},
)
return cursor.fetchone()["cnt"]


def get_total_items_count(dataset_id: int) -> int:
"""
Return the number of items to process.
:param dataset_id: the dataset's ID
"""
with get_cursor() as cursor:
cursor.execute(
"SELECT size FROM progress_monitor_dataset WHERE dataset_id = %(dataset_id)s", {"dataset_id": dataset_id}
)
return cursor.fetchone()["size"]


def get_dataset_progress(dataset_id: int) -> Tuple[Any, ...]:
"""
Return the dataset's progress.
:param dataset_id: the dataset's ID
"""
with get_cursor() as cursor:
cursor.execute(
"SELECT * FROM progress_monitor_dataset WHERE dataset_id = %(dataset_id)s", {"dataset_id": dataset_id}
)
return cursor.fetchone()
103 changes: 0 additions & 103 deletions tools/state.py

This file was deleted.

5 changes: 3 additions & 2 deletions workers/check/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
from tools import settings
from tools.currency_converter import bootstrap
from tools.helpers import finish_callback, is_step_required
from tools.services import commit, consume
from tools.state import (
from tools.services import (
commit,
consume,
get_dataset_progress,
get_processed_items_count,
get_total_items_count,
Expand Down
3 changes: 1 addition & 2 deletions workers/check/time_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
from time_variance import processor
from tools import settings
from tools.helpers import finish_callback, is_step_required
from tools.services import commit, consume
from tools.state import phase, set_dataset_state, state
from tools.services import commit, consume, phase, set_dataset_state, state

consume_routing_key = "dataset_checker"
routing_key = "time_variance_checker"
Expand Down
3 changes: 1 addition & 2 deletions workers/extract/dataset_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
from yapw.methods.blocking import ack, nack, publish

from tools import settings
from tools.services import commit, consume, get_cursor
from tools.state import phase, set_dataset_state, set_items_state, state
from tools.services import commit, consume, get_cursor, phase, set_dataset_state, set_items_state, state

consume_routing_key = "dataset_filter_extractor_init"
routing_key = "extractor"
Expand Down
3 changes: 1 addition & 2 deletions workers/extract/kingfisher_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

import dataset.meta_data_aggregator as meta_data_aggregator
from tools import exchange_rates_db, settings
from tools.services import commit, consume, get_cursor
from tools.state import phase, set_dataset_state, set_items_state, state
from tools.services import commit, consume, get_cursor, phase, set_dataset_state, set_items_state, state

consume_routing_key = "ocds_kingfisher_extractor_init"
routing_key = "extractor"
Expand Down
3 changes: 1 addition & 2 deletions workers/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
from dataset import meta_data_aggregator
from tools import settings
from tools.helpers import finish_callback, is_step_required
from tools.services import consume
from tools.state import phase
from tools.services import consume, phase

consume_routing_key = "time_variance_checker"
logger = logging.getLogger("pelican.workers.report")
Expand Down

0 comments on commit 1a08cba

Please sign in to comment.